diff --git a/src/macros.rs b/src/macros.rs index a478fd71784f5fe69cbb42f8bf141b388868a5fc..0544a813b6f663f00cf3e43488546d4b832a2050 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -5,6 +5,12 @@ macro_rules! lockprintln { std::writeln!(std::io::stdout().lock(), $($arg)*).expect("LPRINTLN"); }) } +macro_rules! log { + ($logger:expr, $($arg:tt)*) => {{ + use std::fmt::Write; + writeln!($logger, $($arg)*).unwrap(); + }}; +} macro_rules! assert_let { ($pat:pat = $expr:expr => $work:expr) => { if let $pat = $expr { diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index 0a7d62144bfffaa706a0dfce779f7dc7faa818fd..ccde8c4efae77700b9bc4447aac2eb47b3c9d4fd 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -56,7 +56,7 @@ impl PolyP { ) -> Result { use SyncRunResult as Srr; let cid = m_ctx.inner.channel_id_stream.controller_id; - lockprintln!("{:?}: ~ Running branches for PolyP {:?}!", cid, m_ctx.my_subtree_id,); + log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,); while let Some((mut predicate, mut branch)) = to_run.pop() { let mut r_ctx = BranchPContext { m_ctx: m_ctx.reborrow(), @@ -66,9 +66,9 @@ impl PolyP { }; use PolyBlocker as Sb; let blocker = branch.state.sync_run(&mut r_ctx, protocol_description); - lockprintln!( - "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", - cid, + log!( + &mut r_ctx.m_ctx.inner.logger, + "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", r_ctx.m_ctx.my_subtree_id, &predicate, &blocker @@ -89,10 +89,10 @@ impl PolyP { if predicate.replace_assignment(channel_id, true) != Some(false) { // don't rerun now. Rerun at next `sync_run` - lockprintln!("{:?}: ~ ... Delay {:?}", cid, m_ctx.my_subtree_id,); + log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); self.incomplete.insert(predicate, branch); } else { - lockprintln!("{:?}: ~ ... Drop {:?}", cid, m_ctx.my_subtree_id,); + log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); } // ELSE DROP } @@ -111,36 +111,35 @@ impl PolyP { to_run.push((predicate_f, branch_f)); } Sb::SyncBlockEnd => { + log!( + &mut m_ctx.inner.logger, + "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", + m_ctx.my_subtree_id, + &predicate, + &blocker + ); // come up with the predicate for this local solution let lookup = |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; let ekeys_channel_id_iter = self.ekeys.iter().map(lookup); predicate.batch_assign_nones(ekeys_channel_id_iter, false); - lockprintln!( - "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", - cid, - m_ctx.my_subtree_id, - &predicate, - &blocker - ); - // OK now check we really received all the messages we expected to let num_fired = predicate.iter_matching(true).count(); let num_msgs = branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count(); match num_fired.cmp(&num_msgs) { Ordering::Less => unreachable!(), - Ordering::Greater => lockprintln!( - "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .", - cid, + Ordering::Greater => log!( + &mut m_ctx.inner.logger, + "{:?} with pred {:?} finished but |inbox|+|outbox| < .", m_ctx.my_subtree_id, &predicate, ), Ordering::Equal => { - lockprintln!( - "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", - cid, + log!( + &mut m_ctx.inner.logger, + "{:?} with pred {:?} finished! Storing this solution locally.", m_ctx.my_subtree_id, &predicate, ); @@ -287,7 +286,11 @@ impl PolyP { std::mem::swap(&mut self.incomplete, &mut incomplete2); to_run }; - lockprintln!("{:?}: ... DONE FEEDING BRANCHES. {} branches to run!", cid, to_run.len(),); + log!( + &mut m_ctx.inner.logger, + "... DONE FEEDING BRANCHES. {} branches to run!", + to_run.len(), + ); self.poly_run_these_branches(m_ctx, protocol_description, to_run) } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 5bfd49a94c9133183f4f18545931172ec193777a..9b0780de5ba0f4fc6bb45870d0e9cf970786ef44 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -3,8 +3,7 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { - let cid = self.inner.channel_id_stream.controller_id; - lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision); + log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); let mut table_row = HashMap::default(); self.inner.mono_n = self @@ -17,17 +16,17 @@ impl Controller { ); for (ekey, payload) in table_row { let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload); + log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload); } for channel_id in decision.iter_matching(false) { - lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id); + log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id); } let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { - lockprintln!( - "{:?}: Forwarding {:?} to child with ekey {:?}", - cid, + log!( + &mut self.inner.logger, + "Forwarding {:?} to child with ekey {:?}", &announcement, child_ekey ); @@ -45,7 +44,6 @@ impl Controller { // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found fn handle_locals_maybe_decide(&mut self) -> Result { - let cid = self.inner.channel_id_stream.controller_id; if let Some(parent_ekey) = self.inner.family.parent_ekey { // I have a parent -> I'm not the leader let parent_endpoint = @@ -53,7 +51,7 @@ impl Controller { for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - lockprintln!("{:?}: Sending {:?} to parent {:?}", cid, &msg, parent_ekey); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey); parent_endpoint.send(msg)?; } Ok(false) @@ -62,7 +60,7 @@ impl Controller { assert!(self.inner.family.parent_ekey.is_none()); let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next(); Ok(if let Some(decision) = maybe_decision { - lockprintln!("{:?}: DECIDE ON {:?} AS LEADER!", cid, &decision); + log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); self.end_round_with_decision(decision)?; true } else { @@ -132,14 +130,16 @@ impl Controller { assert!(self.ephemeral.is_clear()); - let cid = self.inner.channel_id_stream.controller_id; - lockprintln!(); - lockprintln!("~~~~~~ {:?}: SYNC ROUND STARTS! ROUND={}", cid, self.inner.round_index); + log!( + &mut self.inner.logger, + "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", + self.inner.round_index + ); // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // Some actors are dropped. some new actors are created. // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - lockprintln!("{:?}: Got {} MonoP's to run!", cid, self.inner.mono_ps.len()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len()); self.ephemeral.poly_ps.clear(); // let mut poly_ps: Vec = vec![]; while let Some(mut mono_p) = self.inner.mono_ps.pop() { @@ -152,16 +152,16 @@ impl Controller { }; // cross boundary into crate::protocol let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); - lockprintln!("{:?}: ... MonoP's pre_sync_run got blocker {:?}", cid, &blocker); + log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); match blocker { MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), MonoBlocker::ComponentExit => drop(mono_p), MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), } } - lockprintln!( - "{:?}: Finished running all MonoPs! Have {} PolyPs waiting", - cid, + log!( + &mut self.inner.logger, + "Finished running all MonoPs! Have {} PolyPs waiting", self.ephemeral.poly_ps.len() ); @@ -180,9 +180,9 @@ impl Controller { .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index }))); n.chain(p).collect() }; - lockprintln!( - "{:?}: SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", - cid, + log!( + &mut self.inner.logger, + "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", &ekey_to_holder ); @@ -198,9 +198,9 @@ impl Controller { .iter() .map(|&ekey| SubtreeId::ChildController { ekey }); let subtree_id_iter = n.chain(m).chain(c); - lockprintln!( - "{:?}: Solution Storage has subtree Ids: {:?}", - cid, + log!( + &mut self.inner.logger, + "Solution Storage has subtree Ids: {:?}", &subtree_id_iter.clone().collect::>() ); subtree_id_iter @@ -208,27 +208,27 @@ impl Controller { // 5. kick off the synchronous round of the native actor if it exists - lockprintln!("{:?}: Kicking off native's synchronous round...", cid); + log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // using if let because of nested ? operator // TODO check that there are 1+ branches or NO SOLUTION let poly_n = self.kick_off_native(sync_batches)?; - lockprintln!( - "{:?}: PolyN kicked off, and has branches with predicates... {:?}", - cid, + log!( + &mut self.inner.logger, + "PolyN kicked off, and has branches with predicates... {:?}", poly_n.branches.keys().collect::>() ); Some(poly_n) } else { - lockprintln!("{:?}: NO NATIVE COMPONENT", cid); + log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); None }; // 6. Kick off the synchronous round of each protocol actor // If just one actor becomes inconsistent now, there can be no solution! // TODO distinguish between completed and not completed poly_p's? - lockprintln!("{:?}: Kicking off {} PolyP's.", cid, self.ephemeral.poly_ps.len()); + log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { let my_subtree_id = SubtreeId::PolyP { index }; let m_ctx = PolyPContext { @@ -238,21 +238,21 @@ impl Controller { }; use SyncRunResult as Srr; let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; - lockprintln!("{:?}: ... PolyP's poly_run got blocker {:?}", cid, &blocker); + log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); match blocker { Srr::NoBranches => return Err(SyncErr::Inconsistent), Srr::AllBranchesComplete | Srr::BlockingForRecv => (), } } - lockprintln!("{:?}: All Poly machines have been kicked off!", cid); + log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); // 7. `solution_storage` may have new solutions for this controller // handle their discovery. LEADER => announce, otherwise => send to parent { let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); - lockprintln!( - "{:?}: Got {} controller-local solutions before a single RECV: {:?}", - cid, + log!( + &mut self.inner.logger, + "Got {} controller-local solutions before a single RECV: {:?}", peeked.len(), peeked ); @@ -262,13 +262,13 @@ impl Controller { } // 4. Receive incoming messages until the DECISION is made - lockprintln!("{:?}: No decision yet. Time to recv messages", cid); + log!(&mut self.inner.logger, "No decision yet. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?; let current_content = match received.msg { Msg::SetupMsg(_) => { - lockprintln!("{:?}: recvd message {:?} and its SETUP :(", cid, &received); + log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received); // This occurs in the event the connector was malformed during connect() return Err(SyncErr::UnexpectedSetupMsg); } @@ -276,7 +276,7 @@ impl Controller { if round_index < self.inner.round_index => { // Old message! Can safely discard - lockprintln!("{:?}: recvd message {:?} and its OLD! :(", cid, &received); + log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received); drop(received); continue 'recv_loop; } @@ -284,16 +284,20 @@ impl Controller { if round_index > self.inner.round_index => { // Message from a next round. Keep for later! - lockprintln!( - "{:?}: recvd message {:?} and its for later. DELAY! :(", - cid, + log!( + &mut self.inner.logger, + "ecvd message {:?} and its for later. DELAY! :(", &received ); self.delay(received); continue 'recv_loop; } Msg::CommMsg(CommMsg { contents, round_index }) => { - lockprintln!("{:?}: recvd a round-appropriate CommMsg {:?}", cid, &contents); + log!( + &mut self.inner.logger, + "recvd a round-appropriate CommMsg {:?}", + &contents + ); assert_eq!(round_index, self.inner.round_index); contents } @@ -305,9 +309,9 @@ impl Controller { return Err(SyncErr::ElaborateFromNonChild); } let subtree_id = SubtreeId::ChildController { ekey: received.recipient }; - lockprintln!( - "{:?}: Received elaboration from child for subtree {:?}: {:?}", - cid, + log!( + &mut self.inner.logger, + "Received elaboration from child for subtree {:?}: {:?}", subtree_id, &partial_oracle ); @@ -323,9 +327,9 @@ impl Controller { if self.inner.family.parent_ekey != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } - lockprintln!( - "{:?}: Received ANNOUNCEMENT from from parent {:?}: {:?}", - cid, + log!( + &mut self.inner.logger, + "Received ANNOUNCEMENT from from parent {:?}: {:?}", received.recipient, &oracle ); @@ -335,9 +339,12 @@ impl Controller { // message for some actor. Feed it to the appropriate actor // and then give them another chance to run. let subtree_id = ekey_to_holder.get(&received.recipient); - lockprintln!( - "{:?}: Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", - cid, subtree_id, &payload_predicate, &payload + log!( + &mut self.inner.logger, + "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", + subtree_id, + &payload_predicate, + &payload ); match subtree_id { None => { @@ -382,33 +389,35 @@ impl Controller { payload_predicate, payload, )?; - lockprintln!( - "{:?}: ... Fed the msg to PolyP {:?} and ran it to blocker {:?}", - cid, + log!( + &mut self.inner.logger, + "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", subtree_id, blocker ); match blocker { Srr::NoBranches => return Err(SyncErr::Inconsistent), Srr::BlockingForRecv | Srr::AllBranchesComplete => { - continue 'recv_loop + { + let peeked = self + .ephemeral + .solution_storage + .peek_new_locals() + .collect::>(); + log!( + &mut self.inner.logger, + "Got {} new controller-local solutions from RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } } } } }; - { - let peeked = - self.ephemeral.solution_storage.peek_new_locals().collect::>(); - lockprintln!( - "{:?}: Got {} new controller-local solutions from RECV: {:?}", - cid, - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } } } } diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index f4ffd3baaebf388e8895263ae7c009750667e422..149f5249ca3512fbc780c932b8256afbc8d18ae3 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -92,6 +92,12 @@ impl Connector { }); Ok(()) } + pub fn get_mut_logger(&mut self) -> Option<&mut String> { + match self { + Connector::Connected(connected) => Some(&mut connected.controller.inner.logger), + _ => None, + } + } pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> { use PortOpErr::*; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 05cc55a8a82015c43ca9384734e9a1b0a54be8ff..deeeaa443d29def446a15f508ead3d5603ba5bcd 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -109,6 +109,7 @@ struct ControllerInner { mono_n: Option, mono_ps: Vec, family: ControllerFamily, + logger: String, } /// This structure has its state entirely reset between synchronous rounds diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 1b585c2df61455e2e3e3dd3574a63f498f72fba4..e443f4530e08061f374173d4943ca1e1ff2860d3 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -119,6 +119,7 @@ impl Controller { mono_ps: p_monos, mono_n: n_mono, round_index: 0, + logger: String::default(), }; let controller = Self { protocol_description, inner, ephemeral: Default::default() }; Ok((controller, native_interface)) diff --git a/src/test/connector.rs b/src/test/connector.rs index dd2a974d7d461b9da08714d7b38bd060559cdee7..b1bdff833ec5c31a09dae83c000d6a7091233482 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -15,7 +15,8 @@ fn incremental() { let timeout = Duration::from_millis(1_500); let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()]; let a = thread::spawn(move || { - let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 }); + let controller_id = 0; + let mut x = Connector::Unconfigured(Unconfigured { controller_id }); x.configure( b"primitive main(out a, out b) { synchronous { @@ -29,9 +30,11 @@ fn incremental() { x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); + println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); }); let b = thread::spawn(move || { - let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 }); + let controller_id = 1; + let mut x = Connector::Unconfigured(Unconfigured { controller_id }); x.configure( b"primitive main(in a, in b) { synchronous { @@ -44,6 +47,7 @@ fn incremental() { x.bind_port(1, PortBinding::Active(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); + println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); }); handle(a.join()); handle(b.join()); @@ -55,7 +59,8 @@ fn duo() { let addrs = ["127.0.0.1:7012".parse().unwrap(), "127.0.0.1:7013".parse().unwrap()]; let a = thread::spawn(move || { let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 }); - x.configure(b" + x.configure( + b" primitive main(out a, out b) { synchronous {} synchronous {} @@ -67,7 +72,9 @@ fn duo() { msg m = create(0); put(b, m); } - }").unwrap(); + }", + ) + .unwrap(); x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap(); x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap(); x.connect(timeout).unwrap(); @@ -78,7 +85,8 @@ fn duo() { }); let b = thread::spawn(move || { let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 }); - x.configure(b" + x.configure( + b" primitive main(in a, in b) { while (true) { synchronous { @@ -92,7 +100,9 @@ fn duo() { } } } - }").unwrap(); + }", + ) + .unwrap(); x.bind_port(0, PortBinding::Active(addrs[0])).unwrap(); x.bind_port(1, PortBinding::Active(addrs[1])).unwrap(); x.connect(timeout).unwrap();