From 7b826a4cebeb17d5bc9e5d75d93c7163ba4962b3 2020-02-04 17:36:36 From: Christopher Esterhuyse Date: 2020-02-04 17:36:36 Subject: [PATCH] logging --- diff --git a/src/common.rs b/src/common.rs index 09c7b444324f49055bac3e5a20295aa631e94054..f2b5a9841d9ee1e0932f4ff1613058e1e2333993 100644 --- a/src/common.rs +++ b/src/common.rs @@ -89,13 +89,13 @@ pub trait MonoContext { fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S); fn new_channel(&mut self) -> [Key; 2]; - fn new_random(&self) -> u64; + fn new_random(&mut self) -> u64; } pub trait PolyContext { type D: ProtocolDescription; - fn is_firing(&self, ekey: Key) -> Option; - fn read_msg(&self, ekey: Key) -> Option<&Payload>; + fn is_firing(&mut self, ekey: Key) -> Option; + fn read_msg(&mut self, ekey: Key) -> Option<&Payload>; } ///////////////////// IMPL ///////////////////// diff --git a/src/macros.rs b/src/macros.rs index 0544a813b6f663f00cf3e43488546d4b832a2050..655828c1a752270d7cb6bfc35242baea40e7e6df 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,10 +1,3 @@ -macro_rules! lockprintln { - () => (print!("\n")); - ($($arg:tt)*) => ({ - use std::io::Write; - std::writeln!(std::io::stdout().lock(), $($arg)*).expect("LPRINTLN"); - }) -} macro_rules! log { ($logger:expr, $($arg:tt)*) => {{ use std::fmt::Write; diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index ccde8c4efae77700b9bc4447aac2eb47b3c9d4fd..81bd02c3c84d296c3230280c5a87ed5444909d60 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -79,10 +79,10 @@ impl PolyP { assert!(self.ekeys.contains(&ekey)); let channel_id = r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - lockprintln!( - "{:?}: ~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", - cid, - m_ctx.my_subtree_id, + log!( + &mut r_ctx.m_ctx.inner.logger, + "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", + r_ctx.m_ctx.my_subtree_id, channel_id, &branch.inbox, ); @@ -191,30 +191,30 @@ impl PolyP { payload: Payload, ) -> Result { // try exact match - let cid = m_ctx.inner.channel_id_stream.controller_id; let to_run = if self.complete.contains_key(&payload_predicate) { // exact match with stopped machine - lockprintln!( - "{:?}: ... poly_recv_run matched stopped machine exactly! nothing to do here", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run matched stopped machine exactly! nothing to do here", ); vec![] } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) { // exact match with running machine - lockprintln!( - "{:?}: ... poly_recv_run matched running machine exactly! pred is {:?}", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run matched running machine exactly! pred is {:?}", &payload_predicate ); branch.inbox.insert(ekey, payload); vec![(payload_predicate, branch)] } else { - lockprintln!( - "{:?}: ... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", + ); let mut incomplete2 = HashMap::<_, _>::default(); let to_run = self @@ -224,9 +224,10 @@ impl PolyP { use CommonSatResult as Csr; match old_predicate.common_satisfier(&payload_predicate) { Csr::FormerNotLatter | Csr::Equivalent => { - lockprintln!( - "{:?}: ... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", + &old_predicate ); // old_predicate COVERS the assumptions of payload_predicate @@ -236,9 +237,10 @@ impl PolyP { } Csr::New(new) => { - lockprintln!( - "{:?}: ... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", + &payload_predicate, &old_predicate, &new, @@ -254,9 +256,10 @@ impl PolyP { } Csr::LatterNotFormer => { - lockprintln!( - "{:?}: ... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", + &old_predicate, &payload_predicate, ); @@ -270,9 +273,10 @@ impl PolyP { Some((payload_predicate.clone(), payload_branch)) } Csr::Nonexistant => { - lockprintln!( - "{:?}: ... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", - cid, + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", + &old_predicate, &payload_predicate, ); diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 9b0780de5ba0f4fc6bb45870d0e9cf970786ef44..233df882222cf12f109238462a28fd3648f44650 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -464,9 +464,9 @@ impl MonoContext for MonoPContext<'_> { type D = ProtocolD; type S = ProtocolS; fn new_component(&mut self, moved_ekeys: HashSet, init_state: Self::S) { - lockprintln!( - "{:?}: !! MonoContext callback to new_component with ekeys {:?}!", - self.inner.channel_id_stream.controller_id, + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_component with ekeys {:?}!", &moved_ekeys, ); if moved_ekeys.is_subset(self.ekeys) { @@ -487,21 +487,21 @@ impl MonoContext for MonoPContext<'_> { info: EndpointInfo { polarity: Putter, channel_id }, endpoint: b, }); - lockprintln!( - "{:?}: !! MonoContext callback to new_channel. returning ekeys {:?}!", - self.inner.channel_id_stream.controller_id, + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_channel. returning ekeys {:?}!", [kp, kg], ); [kp, kg] } - fn new_random(&self) -> u64 { + fn new_random(&mut self) -> u64 { type Bytes8 = [u8; std::mem::size_of::()]; let mut bytes = Bytes8::default(); getrandom::getrandom(&mut bytes).unwrap(); let val = unsafe { std::mem::transmute::(bytes) }; - lockprintln!( - "{:?}: !! MonoContext callback to new_random. returning val {:?}!", - self.inner.channel_id_stream.controller_id, + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_random. returning val {:?}!", val, ); val @@ -591,24 +591,24 @@ impl SolutionStorage { impl PolyContext for BranchPContext<'_, '_> { type D = ProtocolD; - fn is_firing(&self, ekey: Key) -> Option { + fn is_firing(&mut self, ekey: Key) -> Option { assert!(self.ekeys.contains(&ekey)); let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; let val = self.predicate.query(channel_id); - lockprintln!( - "{:?}: !! PolyContext callback to is_firing by {:?}! returning {:?}", - self.m_ctx.inner.channel_id_stream.controller_id, + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to is_firing by {:?}! returning {:?}", self.m_ctx.my_subtree_id, val, ); val } - fn read_msg(&self, ekey: Key) -> Option<&Payload> { + fn read_msg(&mut self, ekey: Key) -> Option<&Payload> { assert!(self.ekeys.contains(&ekey)); let val = self.inbox.get(&ekey); - lockprintln!( - "{:?}: !! PolyContext callback to read_msg by {:?}! returning {:?}", - self.m_ctx.inner.channel_id_stream.controller_id, + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to read_msg by {:?}! returning {:?}", self.m_ctx.my_subtree_id, val, ); diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index e443f4530e08061f374173d4943ca1e1ff2860d3..0cfa3a63e6d214ddf27b0213d0d59035b52b96fc 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -26,6 +26,9 @@ impl Controller { ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { use ConnectErr::*; + let mut logger = String::default(); + log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); + let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); @@ -90,11 +93,11 @@ impl Controller { } } } - println!("{:03?} setup todos...", major); + log!(&mut logger, "{:03?} setup todos...", major); // 2. convert the arena to Arena and return the let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, endpoint_ext_todos, deadline)?; + Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?; let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }); let p_monos = vec![MonoP { @@ -105,12 +108,14 @@ impl Controller { // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} let family = Self::setup_sink_tree_family( major, + &mut logger, &mut endpoint_exts, &mut messenger_state, ekeys_network, deadline, )?; + log!(&mut logger, "CONNECT PHASE END! ~"); let inner = ControllerInner { family, messenger_state, @@ -119,7 +124,7 @@ impl Controller { mono_ps: p_monos, mono_n: n_mono, round_index: 0, - logger: String::default(), + logger, }; let controller = Self { protocol_description, inner, ephemeral: Default::default() }; Ok((controller, native_interface)) @@ -133,6 +138,7 @@ impl Controller { // inserts fn finish_endpoint_ext_todos( major: ControllerId, + logger: &mut String, mut endpoint_ext_todos: Arena, deadline: Instant, ) -> Result<(MessengerState, Arena), ConnectErr> { @@ -152,7 +158,7 @@ impl Controller { // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`. let mut to_finish = HashSet::<_>::default(); - println!("endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); + log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); for (key, t) in endpoint_ext_todos.iter() { let token = key.to_token(); match t { @@ -187,7 +193,7 @@ impl Controller { polled_undrained_later.insert(ekey); } PassiveAccepting { addr, listener, .. } => { - println!("{:03?} start PassiveAccepting...", major); + log!(logger, "{:03?} start PassiveAccepting...", major); assert!(event.readiness().is_readable()); let (stream, _peer_addr) = listener.accept().map_err(|_| AcceptFailed(*addr))?; @@ -198,10 +204,10 @@ impl Controller { PassiveConnecting { addr, info, stream } }] }); - println!("{:03?} ... end PassiveAccepting", major); + log!(logger, "{:03?} ... end PassiveAccepting", major); } PassiveConnecting { addr, stream, .. } => { - println!("{:03?} start PassiveConnecting...", major); + log!(logger, "{:03?} start PassiveConnecting...", major); assert!(event.readiness().is_writable()); if !Self::test_stream_connectivity(stream) { return Err(PassiveConnectFailed(*addr)); @@ -217,15 +223,15 @@ impl Controller { }] }); res?; - println!("{:03?} ... end PassiveConnecting", major); + log!(logger, "{:03?} ... end PassiveConnecting", major); assert!(to_finish.remove(&ekey)); } ActiveConnecting { addr, stream, .. } => { - println!("{:03?} start ActiveConnecting...", major); + log!(logger, "{:03?} start ActiveConnecting...", major); assert!(event.readiness().is_writable()); if Self::test_stream_connectivity(stream) { // connect successful - println!("CONNECT SUCCESS"); + log!(logger, "CONNECT SUCCESS"); ms.poll.reregister(stream, token, ready_r, edge).expect("52"); take_mut::take(entry, |e| { assert_let![ActiveConnecting { stream, polarity, addr } = e => { @@ -233,10 +239,10 @@ impl Controller { ActiveRecving { endpoint, polarity, addr } }] }); - println!(".. ok"); + log!(logger, ".. ok"); } else { // connect failure. retry! - println!("CONNECT FAIL"); + log!(logger, "CONNECT FAIL"); ms.poll.deregister(stream).expect("wt"); std::thread::sleep(Duration::from_millis(backoff_millis)); backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; @@ -244,12 +250,10 @@ impl Controller { ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3"); std::mem::swap(stream, &mut new_stream); } - println!("{:03?} ... end ActiveConnecting", major); + log!(logger, "{:03?} ... end ActiveConnecting", major); } ActiveRecving { addr, polarity, endpoint } => { - println!("{:03?} start ActiveRecving...", major); - println!("{:03?} start ActiveRecving...", major); - println!("{:03?} start ActiveRecving...", major); + log!(logger, "{:03?} start ActiveRecving...", major); assert!(event.readiness().is_readable()); 'recv_loop: while let Some(msg) = endpoint.recv()? { if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { @@ -269,7 +273,7 @@ impl Controller { ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); } } - println!("{:03?} ... end ActiveRecving", major); + log!(logger, "{:03?} ... end ActiveRecving", major); } } } @@ -286,6 +290,7 @@ impl Controller { fn setup_sink_tree_family( major: ControllerId, + logger: &mut String, endpoint_exts: &mut Arena, messenger_state: &mut MessengerState, neighbors: Vec, @@ -293,7 +298,7 @@ impl Controller { ) -> Result { use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; - println!("neighbors {:?}", &neighbors); + log!(logger, "neighbors {:?}", &neighbors); let mut messenger = (messenger_state, endpoint_exts); impl Messengerlike for (&mut MessengerState, &mut Arena) { @@ -309,7 +314,7 @@ impl Controller { let echo = S(LeaderEcho { maybe_leader: major }); let mut awaiting = IndexSet::with_capacity(neighbors.len()); for &n in neighbors.iter() { - println!("{:?}'s initial echo to {:?}, {:?}", major, n, &echo); + log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo); messenger.send(n, echo.clone())?; awaiting.insert(n); } @@ -321,7 +326,7 @@ impl Controller { messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - println!("{:?} GOT {:?} {:?}", major, &recipient, &msg); + log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg); match msg { S(LeaderAnnounce { leader }) => { // someone else completed the echo and became leader first! @@ -349,24 +354,29 @@ impl Controller { } Greater => { // join new echo - println!("{:?} setting leader to {:?}", major, recipient); + log!(logger, "{:?} setting leader to {:?}", major, recipient); parent = Some(recipient); my_leader = maybe_leader; let echo = S(LeaderEcho { maybe_leader: my_leader }); awaiting.clear(); if neighbors.len() == 1 { // immediately reply to parent - println!( + log!( + logger, "{:?} replying echo to parent {:?} immediately", - major, recipient + major, + recipient ); messenger.send(recipient, echo.clone())?; } else { for &n in neighbors.iter() { if n != recipient { - println!( + log!( + logger, "{:?} repeating echo {:?} to {:?}", - major, &echo, n + major, + &echo, + n ); messenger.send(n, echo.clone())?; awaiting.insert(n); @@ -391,7 +401,7 @@ impl Controller { parent, major ), } - println!("{:?} DONE WITH ECHO", major); + log!(logger, "{:?} DONE WITH ECHO", major); // 3. broadcast leader announcement (except to parent: confirm they are your parent) // in this loop, every node sends 1 message to each neighbor @@ -399,7 +409,7 @@ impl Controller { for &k in neighbors.iter() { let msg = if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - println!("{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); + log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); messenger.send(k, msg)?; }