Changeset - 4d0dd51ae541
[Not reviewed]
Merge
0 6 0
Hans-Dieter Hiep - 5 years ago 2020-02-04 17:37:22
hdh@cwi.nl
Merge branch 'master' of github.com:sirkibsirkib/Reowolf
5 files changed with 88 insertions and 81 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -80,31 +80,31 @@ pub enum PolyBlocker {
 
    SyncBlockEnd,
 
    CouldntReadMsg(Key),
 
    CouldntCheckFiring(Key),
 
    PutMsg(Key, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Key; 2];
 
    fn new_random(&self) -> u64;
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&self, ekey: Key) -> Option<&Payload>;
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Key {
 
    pub fn from_raw(raw: u64) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> u64 {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
src/macros.rs
Show inline comments
 
macro_rules! lockprintln {
 
    () => (print!("\n"));
 
    ($($arg:tt)*) => ({
 
        use std::io::Write;
 
        std::writeln!(std::io::stdout().lock(), $($arg)*).expect("LPRINTLN");
 
    })
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        use std::fmt::Write;
 
        writeln!($logger, $($arg)*).unwrap();
 
    }};
 
}
 
macro_rules! assert_let {
 
    ($pat:pat = $expr:expr => $work:expr) => {
 
        if let $pat = $expr {
 
            $work
 
        } else {
 
            panic!("assert_let failed");
src/runtime/actors.rs
Show inline comments
 
@@ -70,28 +70,28 @@ impl PolyP {
 
                &mut r_ctx.m_ctx.inner.logger,
 
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    lockprintln!(
 
                        "{:?}: ~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        r_ctx.m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
@@ -182,106 +182,110 @@ impl PolyP {
 
        })
 
    }
 

	
 
    pub(crate) fn poly_recv_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        // try exact match
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 

	
 
        let to_run = if self.complete.contains_key(&payload_predicate) {
 
            // exact match with stopped machine
 

	
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run matched stopped machine exactly! nothing to do here",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
 
            );
 
            vec![]
 
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
 
            // exact match with running machine
 

	
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run matched running machine exactly! pred is {:?}",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            vec![(payload_predicate, branch)]
 
        } else {
 
            lockprintln!(
 
                "{:?}: ... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 
                cid,
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 

	
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                              
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
                            let was = branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                              
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                           
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((payload_predicate.clone(), payload_branch))
 
                        }
 
                        Csr::Nonexistant => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                                cid,
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                              
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
 
                        }
 
                    }
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
src/runtime/communication.rs
Show inline comments
 
@@ -455,62 +455,62 @@ impl Into<PolyP> for MonoP {
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_component with ekeys {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        } else {
 
            panic!("MachineP attempting to move alien ekey!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 
        let kp = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: a,
 
        });
 
        let kg = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: b,
 
        });
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&self) -> u64 {
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_random. returning val {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
 

	
 
impl SolutionStorage {
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
@@ -582,36 +582,36 @@ impl SolutionStorage {
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&self, ekey: Key) -> Option<bool> {
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        lockprintln!(
 
            "{:?}: !! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&self, ekey: Key) -> Option<&Payload> {
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let val = self.inbox.get(&ekey);
 
        lockprintln!(
 
            "{:?}: !! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.inner.channel_id_stream.controller_id,
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -17,24 +17,27 @@ enum EndpointExtTodo {
 

	
 
///////////////////// IMPL /////////////////////
 
impl Controller {
 
    // Given port bindings and a protocol config, create a connector with 1 native node
 
    pub fn connect(
 
        major: ControllerId,
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 

	
 
        let mut native_interface = vec![];
 

	
 
        /*
 
        1.  - allocate an EndpointExtTodo for every native and interface port
 
            - store all the resulting keys in two keylists for the interfaces of the native and proto components
 
@@ -81,87 +84,90 @@ impl Controller {
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        println!("{:03?} setup todos...", major);
 
        log!(&mut logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None });
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(&ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger: String::default(),
 
            logger,
 
        };
 
        let controller = Self { protocol_description, inner, ephemeral: Default::default() };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(|_| PollInitFailed)?,
 
            events: Events::with_capacity(endpoint_ext_todos.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        println!("endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
                }
 
                ActiveConnecting { stream, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(stream, token, ready_w, edge)
 
                }
 
                PassiveAccepting { listener, .. } => {
 
@@ -178,159 +184,158 @@ impl Controller {
 
        let mut backoff_millis = 10;
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            for event in ms.events.iter() {
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        println!("{:03?} start PassiveAccepting...", major);
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
                        assert!(event.readiness().is_readable());
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptFailed(*addr))?;
 
                        ms.poll.deregister(listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveAccepting { addr, info, .. } = e => {
 
                                PassiveConnecting { addr, info, stream }
 
                            }]
 
                        });
 
                        println!("{:03?} ... end PassiveAccepting", major);
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        println!("{:03?} start PassiveConnecting...", major);
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        println!("{:03?} ... end PassiveConnecting", major);
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        println!("{:03?} start ActiveConnecting...", major);
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                            // connect successful
 
                            println!("CONNECT SUCCESS");
 
                            log!(logger, "CONNECT SUCCESS");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            println!(".. ok");
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            println!("CONNECT FAIL");
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        println!("{:03?} ... end ActiveConnecting", major);
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
 
                    ActiveRecving { addr, polarity, endpoint } => {
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        println!("{:03?} start ActiveRecving...", major);
 
                        log!(logger, "{:03?} start ActiveRecving...", major);
 
                        assert!(event.readiness().is_readable());
 
                        'recv_loop: while let Some(msg) = endpoint.recv()? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == *polarity {
 
                                    return Err(PolarityMatched(*addr));
 
                                }
 
                                take_mut::take(entry, |e| {
 
                                    assert_let![ActiveRecving { polarity, endpoint, .. } = e => {
 
                                        let info = EndpointInfo { polarity, channel_id: info.channel_id };
 
                                        Finished(EndpointExt { info, endpoint })
 
                                    }]
 
                                });
 
                                ms.polled_undrained.insert(ekey);
 
                                assert!(to_finish.remove(&ekey));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                        println!("{:03?} ... end ActiveRecving", major);
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
                    }
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
        }
 
        let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo {
 
            Finished(endpoint_ext) => endpoint_ext,
 
            _ => unreachable!(),
 
        });
 
        Ok((ms, endpoint_exts))
 
    }
 

	
 
    fn setup_sink_tree_family(
 
        major: ControllerId,
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 

	
 
        println!("neighbors {:?}", &neighbors);
 
        log!(logger, "neighbors {:?}", &neighbors);
 

	
 
        let mut messenger = (messenger_state, endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut Arena<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
            println!("{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            messenger.send(n, echo.clone())?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            println!("{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
 
                S(LeaderEcho { maybe_leader }) => {
 
                    use Ordering::*;
 
                    match maybe_leader.cmp(&my_leader) {
 
@@ -340,75 +345,80 @@ impl Controller {
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger.send(p, S(LeaderEcho { maybe_leader }))?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            println!("{:?} setting leader to {:?}", major, recipient);
 
                            log!(logger, "{:?} setting leader to {:?}", major, recipient);
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                println!(
 
                                log!(
 
                                    logger,
 
                                    "{:?} replying echo to parent {:?} immediately",
 
                                    major, recipient
 
                                    major,
 
                                    recipient
 
                                );
 
                                messenger.send(recipient, echo.clone())?;
 
                            } else {
 
                                for &n in neighbors.iter() {
 
                                    if n != recipient {
 
                                        println!(
 
                                        log!(
 
                                            logger,
 
                                            "{:?} repeating echo {:?} to {:?}",
 
                                            major, &echo, n
 
                                            major,
 
                                            &echo,
 
                                            n
 
                                        );
 
                                        messenger.send(n, echo.clone())?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, major,
 
                "I've got no parent, but I consider {:?} the leader?",
 
                my_leader
 
            ),
 
            Some(parent) => assert_ne!(
 
                my_leader, major,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, major
 
            ),
 
        }
 
        println!("{:?} DONE WITH ECHO", major);
 
        log!(logger, "{:?} DONE WITH ECHO", major);
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for &k in neighbors.iter() {
 
            let msg =
 
                if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            println!("{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            messenger.send(k, msg)?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for &n in neighbors.iter() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = Vec::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
0 comments (0 inline, 0 general)