diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index e443f4530e08061f374173d4943ca1e1ff2860d3..0cfa3a63e6d214ddf27b0213d0d59035b52b96fc 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -26,6 +26,9 @@ impl Controller { ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { use ConnectErr::*; + let mut logger = String::default(); + log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); + let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); @@ -90,11 +93,11 @@ impl Controller { } } } - println!("{:03?} setup todos...", major); + log!(&mut logger, "{:03?} setup todos...", major); // 2. convert the arena to Arena and return the let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, endpoint_ext_todos, deadline)?; + Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?; let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }); let p_monos = vec![MonoP { @@ -105,12 +108,14 @@ impl Controller { // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} let family = Self::setup_sink_tree_family( major, + &mut logger, &mut endpoint_exts, &mut messenger_state, ekeys_network, deadline, )?; + log!(&mut logger, "CONNECT PHASE END! ~"); let inner = ControllerInner { family, messenger_state, @@ -119,7 +124,7 @@ impl Controller { mono_ps: p_monos, mono_n: n_mono, round_index: 0, - logger: String::default(), + logger, }; let controller = Self { protocol_description, inner, ephemeral: Default::default() }; Ok((controller, native_interface)) @@ -133,6 +138,7 @@ impl Controller { // inserts fn finish_endpoint_ext_todos( major: ControllerId, + logger: &mut String, mut endpoint_ext_todos: Arena, deadline: Instant, ) -> Result<(MessengerState, Arena), ConnectErr> { @@ -152,7 +158,7 @@ impl Controller { // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`. let mut to_finish = HashSet::<_>::default(); - println!("endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); + log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); for (key, t) in endpoint_ext_todos.iter() { let token = key.to_token(); match t { @@ -187,7 +193,7 @@ impl Controller { polled_undrained_later.insert(ekey); } PassiveAccepting { addr, listener, .. } => { - println!("{:03?} start PassiveAccepting...", major); + log!(logger, "{:03?} start PassiveAccepting...", major); assert!(event.readiness().is_readable()); let (stream, _peer_addr) = listener.accept().map_err(|_| AcceptFailed(*addr))?; @@ -198,10 +204,10 @@ impl Controller { PassiveConnecting { addr, info, stream } }] }); - println!("{:03?} ... end PassiveAccepting", major); + log!(logger, "{:03?} ... end PassiveAccepting", major); } PassiveConnecting { addr, stream, .. } => { - println!("{:03?} start PassiveConnecting...", major); + log!(logger, "{:03?} start PassiveConnecting...", major); assert!(event.readiness().is_writable()); if !Self::test_stream_connectivity(stream) { return Err(PassiveConnectFailed(*addr)); @@ -217,15 +223,15 @@ impl Controller { }] }); res?; - println!("{:03?} ... end PassiveConnecting", major); + log!(logger, "{:03?} ... end PassiveConnecting", major); assert!(to_finish.remove(&ekey)); } ActiveConnecting { addr, stream, .. } => { - println!("{:03?} start ActiveConnecting...", major); + log!(logger, "{:03?} start ActiveConnecting...", major); assert!(event.readiness().is_writable()); if Self::test_stream_connectivity(stream) { // connect successful - println!("CONNECT SUCCESS"); + log!(logger, "CONNECT SUCCESS"); ms.poll.reregister(stream, token, ready_r, edge).expect("52"); take_mut::take(entry, |e| { assert_let![ActiveConnecting { stream, polarity, addr } = e => { @@ -233,10 +239,10 @@ impl Controller { ActiveRecving { endpoint, polarity, addr } }] }); - println!(".. ok"); + log!(logger, ".. ok"); } else { // connect failure. retry! - println!("CONNECT FAIL"); + log!(logger, "CONNECT FAIL"); ms.poll.deregister(stream).expect("wt"); std::thread::sleep(Duration::from_millis(backoff_millis)); backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; @@ -244,12 +250,10 @@ impl Controller { ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3"); std::mem::swap(stream, &mut new_stream); } - println!("{:03?} ... end ActiveConnecting", major); + log!(logger, "{:03?} ... end ActiveConnecting", major); } ActiveRecving { addr, polarity, endpoint } => { - println!("{:03?} start ActiveRecving...", major); - println!("{:03?} start ActiveRecving...", major); - println!("{:03?} start ActiveRecving...", major); + log!(logger, "{:03?} start ActiveRecving...", major); assert!(event.readiness().is_readable()); 'recv_loop: while let Some(msg) = endpoint.recv()? { if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { @@ -269,7 +273,7 @@ impl Controller { ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); } } - println!("{:03?} ... end ActiveRecving", major); + log!(logger, "{:03?} ... end ActiveRecving", major); } } } @@ -286,6 +290,7 @@ impl Controller { fn setup_sink_tree_family( major: ControllerId, + logger: &mut String, endpoint_exts: &mut Arena, messenger_state: &mut MessengerState, neighbors: Vec, @@ -293,7 +298,7 @@ impl Controller { ) -> Result { use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; - println!("neighbors {:?}", &neighbors); + log!(logger, "neighbors {:?}", &neighbors); let mut messenger = (messenger_state, endpoint_exts); impl Messengerlike for (&mut MessengerState, &mut Arena) { @@ -309,7 +314,7 @@ impl Controller { let echo = S(LeaderEcho { maybe_leader: major }); let mut awaiting = IndexSet::with_capacity(neighbors.len()); for &n in neighbors.iter() { - println!("{:?}'s initial echo to {:?}, {:?}", major, n, &echo); + log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo); messenger.send(n, echo.clone())?; awaiting.insert(n); } @@ -321,7 +326,7 @@ impl Controller { messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - println!("{:?} GOT {:?} {:?}", major, &recipient, &msg); + log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg); match msg { S(LeaderAnnounce { leader }) => { // someone else completed the echo and became leader first! @@ -349,24 +354,29 @@ impl Controller { } Greater => { // join new echo - println!("{:?} setting leader to {:?}", major, recipient); + log!(logger, "{:?} setting leader to {:?}", major, recipient); parent = Some(recipient); my_leader = maybe_leader; let echo = S(LeaderEcho { maybe_leader: my_leader }); awaiting.clear(); if neighbors.len() == 1 { // immediately reply to parent - println!( + log!( + logger, "{:?} replying echo to parent {:?} immediately", - major, recipient + major, + recipient ); messenger.send(recipient, echo.clone())?; } else { for &n in neighbors.iter() { if n != recipient { - println!( + log!( + logger, "{:?} repeating echo {:?} to {:?}", - major, &echo, n + major, + &echo, + n ); messenger.send(n, echo.clone())?; awaiting.insert(n); @@ -391,7 +401,7 @@ impl Controller { parent, major ), } - println!("{:?} DONE WITH ECHO", major); + log!(logger, "{:?} DONE WITH ECHO", major); // 3. broadcast leader announcement (except to parent: confirm they are your parent) // in this loop, every node sends 1 message to each neighbor @@ -399,7 +409,7 @@ impl Controller { for &k in neighbors.iter() { let msg = if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - println!("{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); + log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); messenger.send(k, msg)?; }