Changeset - 7b826a4cebeb
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-02-04 17:36:36
christopheresterhuyse@gmail.com
logging
5 files changed with 88 insertions and 81 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -86,19 +86,19 @@ pub enum PolyBlocker {
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Key; 2];
 
    fn new_random(&self) -> u64;
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&self, ekey: Key) -> Option<&Payload>;
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Key {
 
    pub fn from_raw(raw: u64) -> Self {
 
        Self(raw)
src/macros.rs
Show inline comments
 
macro_rules! lockprintln {
 
    () => (print!("\n"));
 
    ($($arg:tt)*) => ({
 
        use std::io::Write;
 
        std::writeln!(std::io::stdout().lock(), $($arg)*).expect("LPRINTLN");
 
    })
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        use std::fmt::Write;
 
        writeln!($logger, $($arg)*).unwrap();
 
    }};
 
}
src/runtime/actors.rs
Show inline comments
 
@@ -76,16 +76,16 @@ impl PolyP {
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    lockprintln!(
 
                        "{:?}: ~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        r_ctx.m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
@@ -188,60 +188,62 @@ impl PolyP {
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        // try exact match
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 

	
 
        let to_run = if self.complete.contains_key(&payload_predicate) {
 
            // exact match with stopped machine
 

	
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run matched stopped machine exactly! nothing to do here",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
 
            );
 
            vec![]
 
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
 
            // exact match with running machine
 

	
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run matched running machine exactly! pred is {:?}",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            vec![(payload_predicate, branch)]
 
        } else {
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 

	
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                              
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
                            let was = branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                              
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
@@ -251,15 +253,16 @@ impl PolyP {
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                           
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
@@ -267,15 +270,16 @@ impl PolyP {
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((payload_predicate.clone(), payload_branch))
 
                        }
 
                        Csr::Nonexistant => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                              
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
src/runtime/communication.rs
Show inline comments
 
@@ -461,15 +461,15 @@ impl From<EndpointErr> for SyncErr {
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_component with ekeys {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        } else {
 
@@ -484,27 +484,27 @@ impl MonoContext for MonoPContext<'_> {
 
            endpoint: a,
 
        });
 
        let kg = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: b,
 
        });
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&self) -> u64 {
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_random. returning val {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
 

	
 
@@ -588,30 +588,30 @@ impl SolutionStorage {
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&self, ekey: Key) -> Option<bool> {
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        lockprintln!(
 
            "{:?}: !! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&self, ekey: Key) -> Option<&Payload> {
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let val = self.inbox.get(&ekey);
 
        lockprintln!(
 
            "{:?}: !! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -23,12 +23,15 @@ impl Controller {
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 
@@ -87,42 +90,44 @@ impl Controller {
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        println!("{:03?} setup todos...", major);
 
        log!(&mut logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None });
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(&ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger: String::default(),
 
            logger,
 
        };
 
        let controller = Self { protocol_description, inner, ephemeral: Default::default() };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
@@ -130,12 +135,13 @@ impl Controller {
 
        stream.write(&[]).is_ok()
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
@@ -149,13 +155,13 @@ impl Controller {
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        println!("endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
@@ -184,27 +190,27 @@ impl Controller {
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        println!("{:03?} start PassiveAccepting...", major);
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
                        assert!(event.readiness().is_readable());
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptFailed(*addr))?;
 
                        ms.poll.deregister(listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveAccepting { addr, info, .. } = e => {
 
                                PassiveConnecting { addr, info, stream }
 
                            }]
 
                        });
 
                        println!("{:03?} ... end PassiveAccepting", major);
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        println!("{:03?} start PassiveConnecting...", major);
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
@@ -214,45 +220,43 @@ impl Controller {
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        println!("{:03?} ... end PassiveConnecting", major);
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        println!("{:03?} start ActiveConnecting...", major);
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                            // connect successful
 
                            println!("CONNECT SUCCESS");
 
                            log!(logger, "CONNECT SUCCESS");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            println!(".. ok");
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            println!("CONNECT FAIL");
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        println!("{:03?} ... end ActiveConnecting", major);
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
 
                    ActiveRecving { addr, polarity, endpoint } => {
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        log!(logger, "{:03?} start ActiveRecving...", major);
 
                        assert!(event.readiness().is_readable());
 
                        'recv_loop: while let Some(msg) = endpoint.recv()? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == *polarity {
 
                                    return Err(PolarityMatched(*addr));
 
                                }
 
@@ -266,13 +270,13 @@ impl Controller {
 
                                assert!(to_finish.remove(&ekey));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                        println!("{:03?} ... end ActiveRecving", major);
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
                    }
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
@@ -283,20 +287,21 @@ impl Controller {
 
        });
 
        Ok((ms, endpoint_exts))
 
    }
 

	
 
    fn setup_sink_tree_family(
 
        major: ControllerId,
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 

	
 
        println!("neighbors {:?}", &neighbors);
 
        log!(logger, "neighbors {:?}", &neighbors);
 

	
 
        let mut messenger = (messenger_state, endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut Arena<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
@@ -306,25 +311,25 @@ impl Controller {
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
            println!("{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            messenger.send(n, echo.clone())?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            println!("{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
@@ -346,30 +351,35 @@ impl Controller {
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            println!("{:?} setting leader to {:?}", major, recipient);
 
                            log!(logger, "{:?} setting leader to {:?}", major, recipient);
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                println!(
 
                                log!(
 
                                    logger,
 
                                    "{:?} replying echo to parent {:?} immediately",
 
                                    major, recipient
 
                                    major,
 
                                    recipient
 
                                );
 
                                messenger.send(recipient, echo.clone())?;
 
                            } else {
 
                                for &n in neighbors.iter() {
 
                                    if n != recipient {
 
                                        println!(
 
                                        log!(
 
                                            logger,
 
                                            "{:?} repeating echo {:?} to {:?}",
 
                                            major, &echo, n
 
                                            major,
 
                                            &echo,
 
                                            n
 
                                        );
 
                                        messenger.send(n, echo.clone())?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
@@ -388,21 +398,21 @@ impl Controller {
 
            Some(parent) => assert_ne!(
 
                my_leader, major,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, major
 
            ),
 
        }
 
        println!("{:?} DONE WITH ECHO", major);
 
        log!(logger, "{:?} DONE WITH ECHO", major);
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for &k in neighbors.iter() {
 
            let msg =
 
                if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            println!("{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            messenger.send(k, msg)?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for &n in neighbors.iter() {
 
            if Some(n) != parent {
0 comments (0 inline, 0 general)