diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 5bfd49a94c9133183f4f18545931172ec193777a..9b0780de5ba0f4fc6bb45870d0e9cf970786ef44 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -3,8 +3,7 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { - let cid = self.inner.channel_id_stream.controller_id; - lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision); + log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); let mut table_row = HashMap::default(); self.inner.mono_n = self @@ -17,17 +16,17 @@ impl Controller { ); for (ekey, payload) in table_row { let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload); + log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload); } for channel_id in decision.iter_matching(false) { - lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id); + log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id); } let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { - lockprintln!( - "{:?}: Forwarding {:?} to child with ekey {:?}", - cid, + log!( + &mut self.inner.logger, + "Forwarding {:?} to child with ekey {:?}", &announcement, child_ekey ); @@ -45,7 +44,6 @@ impl Controller { // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found fn handle_locals_maybe_decide(&mut self) -> Result { - let cid = self.inner.channel_id_stream.controller_id; if let Some(parent_ekey) = self.inner.family.parent_ekey { // I have a parent -> I'm not the leader let parent_endpoint = @@ -53,7 +51,7 @@ impl Controller { for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - lockprintln!("{:?}: Sending {:?} to parent {:?}", cid, &msg, parent_ekey); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey); parent_endpoint.send(msg)?; } Ok(false) @@ -62,7 +60,7 @@ impl Controller { assert!(self.inner.family.parent_ekey.is_none()); let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next(); Ok(if let Some(decision) = maybe_decision { - lockprintln!("{:?}: DECIDE ON {:?} AS LEADER!", cid, &decision); + log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); self.end_round_with_decision(decision)?; true } else { @@ -132,14 +130,16 @@ impl Controller { assert!(self.ephemeral.is_clear()); - let cid = self.inner.channel_id_stream.controller_id; - lockprintln!(); - lockprintln!("~~~~~~ {:?}: SYNC ROUND STARTS! ROUND={}", cid, self.inner.round_index); + log!( + &mut self.inner.logger, + "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", + self.inner.round_index + ); // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // Some actors are dropped. some new actors are created. // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - lockprintln!("{:?}: Got {} MonoP's to run!", cid, self.inner.mono_ps.len()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len()); self.ephemeral.poly_ps.clear(); // let mut poly_ps: Vec = vec![]; while let Some(mut mono_p) = self.inner.mono_ps.pop() { @@ -152,16 +152,16 @@ impl Controller { }; // cross boundary into crate::protocol let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); - lockprintln!("{:?}: ... MonoP's pre_sync_run got blocker {:?}", cid, &blocker); + log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); match blocker { MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), MonoBlocker::ComponentExit => drop(mono_p), MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), } } - lockprintln!( - "{:?}: Finished running all MonoPs! Have {} PolyPs waiting", - cid, + log!( + &mut self.inner.logger, + "Finished running all MonoPs! Have {} PolyPs waiting", self.ephemeral.poly_ps.len() ); @@ -180,9 +180,9 @@ impl Controller { .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index }))); n.chain(p).collect() }; - lockprintln!( - "{:?}: SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", - cid, + log!( + &mut self.inner.logger, + "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", &ekey_to_holder ); @@ -198,9 +198,9 @@ impl Controller { .iter() .map(|&ekey| SubtreeId::ChildController { ekey }); let subtree_id_iter = n.chain(m).chain(c); - lockprintln!( - "{:?}: Solution Storage has subtree Ids: {:?}", - cid, + log!( + &mut self.inner.logger, + "Solution Storage has subtree Ids: {:?}", &subtree_id_iter.clone().collect::>() ); subtree_id_iter @@ -208,27 +208,27 @@ impl Controller { // 5. kick off the synchronous round of the native actor if it exists - lockprintln!("{:?}: Kicking off native's synchronous round...", cid); + log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // using if let because of nested ? operator // TODO check that there are 1+ branches or NO SOLUTION let poly_n = self.kick_off_native(sync_batches)?; - lockprintln!( - "{:?}: PolyN kicked off, and has branches with predicates... {:?}", - cid, + log!( + &mut self.inner.logger, + "PolyN kicked off, and has branches with predicates... {:?}", poly_n.branches.keys().collect::>() ); Some(poly_n) } else { - lockprintln!("{:?}: NO NATIVE COMPONENT", cid); + log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); None }; // 6. Kick off the synchronous round of each protocol actor // If just one actor becomes inconsistent now, there can be no solution! // TODO distinguish between completed and not completed poly_p's? - lockprintln!("{:?}: Kicking off {} PolyP's.", cid, self.ephemeral.poly_ps.len()); + log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { let my_subtree_id = SubtreeId::PolyP { index }; let m_ctx = PolyPContext { @@ -238,21 +238,21 @@ impl Controller { }; use SyncRunResult as Srr; let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; - lockprintln!("{:?}: ... PolyP's poly_run got blocker {:?}", cid, &blocker); + log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); match blocker { Srr::NoBranches => return Err(SyncErr::Inconsistent), Srr::AllBranchesComplete | Srr::BlockingForRecv => (), } } - lockprintln!("{:?}: All Poly machines have been kicked off!", cid); + log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); // 7. `solution_storage` may have new solutions for this controller // handle their discovery. LEADER => announce, otherwise => send to parent { let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); - lockprintln!( - "{:?}: Got {} controller-local solutions before a single RECV: {:?}", - cid, + log!( + &mut self.inner.logger, + "Got {} controller-local solutions before a single RECV: {:?}", peeked.len(), peeked ); @@ -262,13 +262,13 @@ impl Controller { } // 4. Receive incoming messages until the DECISION is made - lockprintln!("{:?}: No decision yet. Time to recv messages", cid); + log!(&mut self.inner.logger, "No decision yet. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?; let current_content = match received.msg { Msg::SetupMsg(_) => { - lockprintln!("{:?}: recvd message {:?} and its SETUP :(", cid, &received); + log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received); // This occurs in the event the connector was malformed during connect() return Err(SyncErr::UnexpectedSetupMsg); } @@ -276,7 +276,7 @@ impl Controller { if round_index < self.inner.round_index => { // Old message! Can safely discard - lockprintln!("{:?}: recvd message {:?} and its OLD! :(", cid, &received); + log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received); drop(received); continue 'recv_loop; } @@ -284,16 +284,20 @@ impl Controller { if round_index > self.inner.round_index => { // Message from a next round. Keep for later! - lockprintln!( - "{:?}: recvd message {:?} and its for later. DELAY! :(", - cid, + log!( + &mut self.inner.logger, + "ecvd message {:?} and its for later. DELAY! :(", &received ); self.delay(received); continue 'recv_loop; } Msg::CommMsg(CommMsg { contents, round_index }) => { - lockprintln!("{:?}: recvd a round-appropriate CommMsg {:?}", cid, &contents); + log!( + &mut self.inner.logger, + "recvd a round-appropriate CommMsg {:?}", + &contents + ); assert_eq!(round_index, self.inner.round_index); contents } @@ -305,9 +309,9 @@ impl Controller { return Err(SyncErr::ElaborateFromNonChild); } let subtree_id = SubtreeId::ChildController { ekey: received.recipient }; - lockprintln!( - "{:?}: Received elaboration from child for subtree {:?}: {:?}", - cid, + log!( + &mut self.inner.logger, + "Received elaboration from child for subtree {:?}: {:?}", subtree_id, &partial_oracle ); @@ -323,9 +327,9 @@ impl Controller { if self.inner.family.parent_ekey != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } - lockprintln!( - "{:?}: Received ANNOUNCEMENT from from parent {:?}: {:?}", - cid, + log!( + &mut self.inner.logger, + "Received ANNOUNCEMENT from from parent {:?}: {:?}", received.recipient, &oracle ); @@ -335,9 +339,12 @@ impl Controller { // message for some actor. Feed it to the appropriate actor // and then give them another chance to run. let subtree_id = ekey_to_holder.get(&received.recipient); - lockprintln!( - "{:?}: Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", - cid, subtree_id, &payload_predicate, &payload + log!( + &mut self.inner.logger, + "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", + subtree_id, + &payload_predicate, + &payload ); match subtree_id { None => { @@ -382,33 +389,35 @@ impl Controller { payload_predicate, payload, )?; - lockprintln!( - "{:?}: ... Fed the msg to PolyP {:?} and ran it to blocker {:?}", - cid, + log!( + &mut self.inner.logger, + "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", subtree_id, blocker ); match blocker { Srr::NoBranches => return Err(SyncErr::Inconsistent), Srr::BlockingForRecv | Srr::AllBranchesComplete => { - continue 'recv_loop + { + let peeked = self + .ephemeral + .solution_storage + .peek_new_locals() + .collect::>(); + log!( + &mut self.inner.logger, + "Got {} new controller-local solutions from RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } } } } }; - { - let peeked = - self.ephemeral.solution_storage.peek_new_locals().collect::>(); - lockprintln!( - "{:?}: Got {} new controller-local solutions from RECV: {:?}", - cid, - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } } } }