Changeset - cecf94fdb875
[Not reviewed]
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use crate::protocol::{ComponentState, ProtocolDescription};
 
pub use crate::runtime::{NonsyncContext, SyncContext};
 
pub use crate::runtime::{NonsyncProtoContext, SyncProtoContext};
 

	
 
pub use core::{
 
    cmp::Ordering,
 
@@ -140,3 +140,13 @@ impl Debug for PortId {
 
        write!(f, "PortId({},{})", self.controller_id, self.port_index)
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
    }
 
}
src/protocol/mod.rs
Show inline comments
 
@@ -23,8 +23,8 @@ pub struct ComponentState {
 
    prompt: Prompt,
 
}
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncContext<'a>),
 
    Sync(&'a mut SyncContext<'a>),
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
}
 
//////////////////////////////////////////////
 
@@ -111,7 +111,7 @@ impl ProtocolDescription {
 
impl ComponentState {
 
    pub fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncContext<'b>,
 
        context: &'b mut NonsyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
        let mut context = EvalContext::Nonsync(context);
 
@@ -147,7 +147,7 @@ impl ComponentState {
 

	
 
    pub fn sync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut SyncContext<'b>,
 
        context: &'b mut SyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> SyncBlocker {
 
        let mut context = EvalContext::Sync(context);
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let ret = match &decision {
 
            Decision::Success(predicate) => {
 
                // overwrite MonoN/P
 
                self.inner.mono_n = {
 
                    let poly_n = self.ephemeral.poly_n.take().unwrap();
 
                    poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
                        panic!(
 
                            "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
                            &predicate, &poly_n.branches, &self.inner.logger
 
                        );
 
                    })
 
                };
 
                self.inner.mono_ps.clear();
 
                self.inner.mono_ps.extend(
 
                    self.ephemeral
 
                        .poly_ps
 
                        .drain(..)
 
                        .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
                );
 
                Ok(())
 
            }
 
            Decision::Failure => Err(SyncErr::Timeout),
 
        };
 
        let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
        for &child_port in self.inner.family.children_ports.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with port {:?}",
 
                &announcement,
 
                child_port
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_port)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        ret
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) {
 
        let component = &mut self.connector.proto_components[self.proto_component_index];
 
        assert!(component.ports.is_subset(&moved_ports));
 
        // let polarities = self.proto_description.component_polarities(identifier).expect("BAD ");
 
        // if polarities.len() != ports.len() {
 
        //     return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        // }
 
        // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
        //     if !self.native_ports.contains(port) {
 
        //         return Err(UnknownPort(*port));
 
        //     }
 
        //     if expected_polarity != self.check_polarity(port) {
 
        //         return Err(WrongPortPolarity { port: *port, expected_polarity });
 
        //     }
 
        // }
 
        // // ok!
 
        // let state = self.proto_description.new_main_component(identifier, ports);
 
        // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
 
        // let proto_component_index = self.proto_components.len();
 
        // self.proto_components.push(proto_component);
 
        // for port in ports.iter() {
 
        //     if let Polarity::Getter = self.check_polarity(port) {
 
        //         self.inp_to_route.insert(
 
        //             *port,
 
        //             InpRoute::LocalComponent(LocalComponentId::Proto {
 
        //                 index: proto_component_index,
 
        //             }),
 
        //         );
 
        //     }
 
        // }
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_port) = self.inner.family.parent_port {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_port.is_none());
 
            let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(predicate) = maybe_predicate {
 
                let decision = Decision::Success(predicate);
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    pub fn new_channel(&mut self) -> [PortId; 2] {
 
        self.connector.add_port_pair()
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        todo!()
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        todo!()
 
    }
 
}
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ports, .. } = self.inner.mono_n.clone();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id;
 
            let all_ports = ports.iter().copied();
 
            let all_channel_ids = all_ports.map(port_to_channel_id);
 
// impl Connector {
 
//     fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> {
 
//         log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
//         let ret = match &decision {
 
//             Decision::Success(predicate) => {
 
//                 // overwrite MonoN/P
 
//                 self.inner.mono_n = {
 
//                     let poly_n = self.ephemeral.poly_n.take().unwrap();
 
//                     poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
//                         panic!(
 
//                             "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
//                             &predicate, &poly_n.branches, &self.inner.logger
 
//                         );
 
//                     })
 
//                 };
 
//                 self.inner.mono_ps.clear();
 
//                 self.inner.mono_ps.extend(
 
//                     self.ephemeral
 
//                         .poly_ps
 
//                         .drain(..)
 
//                         .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
//                 );
 
//                 Ok(())
 
//             }
 
//             Decision::Failure => Err(SyncError::Timeout),
 
//         };
 
//         let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
//         for &child_port in self.inner.family.children_ports.iter() {
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 "Forwarding {:?} to child with port {:?}",
 
//                 &announcement,
 
//                 child_port
 
//             );
 
//             self.inner
 
//                 .endpoint_exts
 
//                 .get_mut(child_port)
 
//                 .expect("eefef")
 
//                 .endpoint
 
//                 .send(announcement.clone())?;
 
//         }
 
//         self.inner.round_index += 1;
 
//         self.ephemeral.clear();
 
//         ret
 
//     }
 

	
 
            let mut predicate = Predicate::new_trivial();
 
//     // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
//     fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncError> {
 
//         if let Some(parent_port) = self.inner.family.parent_port {
 
//             // I have a parent -> I'm not the leader
 
//             let parent_endpoint =
 
//                 &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
//             for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
//                 let msg =
 
//                     CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
//                 log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
//                 parent_endpoint.send(msg)?;
 
//             }
 
//             Ok(false)
 
//         } else {
 
//             // I have no parent -> I'm the leader
 
//             assert!(self.inner.family.parent_port.is_none());
 
//             let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
//             Ok(if let Some(predicate) = maybe_predicate {
 
//                 let decision = Decision::Success(predicate);
 
//                 log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
//                 self.end_round_with_decision(decision)?;
 
//                 true
 
//             } else {
 
//                 false
 
//             })
 
//         }
 
//     }
 

	
 
            // assign TRUE for puts and gets
 
            let true_ports = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ports.clone().map(port_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 
//     fn kick_off_native(
 
//         &mut self,
 
//         sync_batches: impl Iterator<Item = SyncBatch>,
 
//     ) -> Result<PolyN, EndpointError> {
 
//         let MonoN { ports, .. } = self.inner.mono_n.clone();
 
//         let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
//         let mut branches = HashMap::<_, _>::default();
 
//         for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
//             let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id;
 
//             let all_ports = ports.iter().copied();
 
//             let all_channel_ids = all_ports.map(port_to_channel_id);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 
//             let mut predicate = Predicate::new_trivial();
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Duplicate predicate {:#?}!\nHaving multiple batches with the same
 
                    predicate requires the support of oracle boolean variables",
 
                    &predicate,
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (port, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ports, branches })
 
    }
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        if let Some(e) = self.unrecoverable_error {
 
            return Err(e.clone());
 
        }
 
        self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
            SyncErr::Timeout => e, // this isn't unrecoverable
 
            _ => {
 
                // Must set unrecoverable error! and tear down our net channels
 
                self.unrecoverable_error = Some(e);
 
                self.ephemeral.clear();
 
                self.inner.endpoint_exts = Default::default();
 
                e
 
            }
 
        })
 
    }
 
//             // assign TRUE for puts and gets
 
//             let true_ports = puts.keys().chain(gets.iter()).copied();
 
//             let true_channel_ids = true_ports.clone().map(port_to_channel_id);
 
//             predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    fn sync_round_inner(
 
        &mut self,
 
        mut deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 
        assert!(self.ephemeral.is_clear());
 
        assert!(self.unrecoverable_error.is_none());
 
//             // assign FALSE for all in interface not assigned true
 
//             predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ports: &mut mono_p.ports,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 
//             if branches.contains_key(&predicate) {
 
//                 // TODO what do I do with redundant predicates?
 
//                 unimplemented!(
 
//                     "Duplicate predicate {:#?}!\nHaving multiple batches with the same
 
//                     predicate requires the support of oracle boolean variables",
 
//                     &predicate,
 
//                 )
 
//             }
 
//             let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
//             for (port, payload) in puts {
 
//                 log!(
 
//                     &mut self.inner.logger,
 
//                     "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
//                     &payload,
 
//                     &predicate,
 
//                     sync_batch_index,
 
//                 );
 
//                 let msg =
 
//                     CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
//                         .into_msg(*round_index);
 
//                 endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
//             }
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 "... Initial native branch batch index={} with pred {:?}",
 
//                 sync_batch_index,
 
//                 &predicate
 
//             );
 
//             if branch.to_get.is_empty() {
 
//                 self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                     &mut self.inner.logger,
 
//                     SubtreeId::PolyN,
 
//                     predicate.clone(),
 
//                 );
 
//             }
 
//             branches.insert(predicate, branch);
 
//         }
 
//         Ok(PolyN { ports, branches })
 
//     }
 
//     pub fn sync_round(
 
//         &mut self,
 
//         deadline: Option<Instant>,
 
//         sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
//     ) -> Result<(), SyncError> {
 
//         if let Some(e) = self.unrecoverable_error {
 
//             return Err(e.clone());
 
//         }
 
//         self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
//             SyncError::Timeout => e, // this isn't unrecoverable
 
//             _ => {
 
//                 // Must set unrecoverable error! and tear down our net channels
 
//                 self.unrecoverable_error = Some(e);
 
//                 self.ephemeral.clear();
 
//                 self.inner.endpoint_exts = Default::default();
 
//                 e
 
//             }
 
//         })
 
//     }
 

	
 
        // 3. define the mapping from port -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let port_to_holder: HashMap<PortId, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
            &port_to_holder
 
        );
 
//     // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
//     // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
//     fn sync_round_inner(
 
//         &mut self,
 
//         mut deadline: Option<Instant>,
 
//         sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
//     ) -> Result<(), SyncError> {
 
//         log!(
 
//             &mut self.inner.logger,
 
//             "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
//             self.inner.round_index
 
//         );
 
//         assert!(self.ephemeral.is_clear());
 
//         assert!(self.unrecoverable_error.is_none());
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = std::iter::once(SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ports
 
                .iter()
 
                .map(|&port| SubtreeId::ChildController { port });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 
//         // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
//         //    Some actors are dropped. some new actors are created.
 
//         //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
//         self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
//         log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
//         while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
//             let mut m_ctx = ProtoSyncContext {
 
//                 ports: &mut mono_p.ports,
 
//                 mono_ps: &mut self.ephemeral.mono_ps,
 
//                 inner: &mut self.inner,
 
//             };
 
//             // cross boundary into crate::protocol
 
//             let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
//             log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
//             match blocker {
 
//                 NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent),
 
//                 NonsyncBlocker::ComponentExit => drop(mono_p),
 
//                 NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
//             }
 
//         }
 
//         log!(
 
//             &mut self.inner.logger,
 
//             "Finished running all MonoPs! Have {} PolyPs waiting",
 
//             self.ephemeral.poly_ps.len()
 
//         );
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 
//         // 3. define the mapping from port -> actor
 
//         //    this is needed during the event loop to determine which actor
 
//         //    should receive the incoming message.
 
//         //    TODO: store and update this mapping rather than rebuilding it each round.
 
//         let port_to_holder: HashMap<PortId, PolyId> = {
 
//             use PolyId::*;
 
//             let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
//             let p = self
 
//                 .ephemeral
 
//                 .poly_ps
 
//                 .iter()
 
//                 .enumerate()
 
//                 .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index })));
 
//             n.chain(p).collect()
 
//         };
 
//         log!(
 
//             &mut self.inner.logger,
 
//             "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
//             &port_to_holder
 
//         );
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 
//         // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
//         //    of the controller in the overlay tree.
 
//         self.ephemeral.solution_storage.reset({
 
//             let n = std::iter::once(SubtreeId::PolyN);
 
//             let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
//             let c = self
 
//                 .inner
 
//                 .family
 
//                 .children_ports
 
//                 .iter()
 
//                 .map(|&port| SubtreeId::ChildController { port });
 
//             let subtree_id_iter = n.chain(m).chain(c);
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 "Solution Storage has subtree Ids: {:?}",
 
//                 &subtree_id_iter.clone().collect::<Vec<_>>()
 
//             );
 
//             subtree_id_iter
 
//         });
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 
//         // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 
//         log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
//         self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
//             // using if let because of nested ? operator
 
//             // TODO check that there are 1+ branches or NO SOLUTION
 
//             let poly_n = self.kick_off_native(sync_batches)?;
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 "PolyN kicked off, and has branches with predicates... {:?}",
 
//                 poly_n.branches.keys().collect::<Vec<_>>()
 
//             );
 
//             Some(poly_n)
 
//         } else {
 
//             log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
//             None
 
//         };
 

	
 
        // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
            let received = match deadline {
 
                None => {
 
                    // we have personally timed out. perform a "long" poll.
 
                    self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
                }
 
                Some(d) => match self.recv(d)? {
 
                    // we have not yet timed out. performed a time-limited poll
 
                    Some(received) => received,
 
                    None => {
 
                        // timed out! send a FAILURE message to the sink,
 
                        // and henceforth don't time out on polling.
 
                        deadline = None;
 
                        match self.inner.family.parent_port {
 
                            None => {
 
                                // I am the sink! announce failure and return.
 
                                return self.end_round_with_decision(Decision::Failure);
 
                            }
 
                            Some(parent_port) => {
 
                                // I am not the sink! send a failure message.
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
                                    contents: CommMsgContents::Failure,
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with port {:?}",
 
                                    &announcement,
 
                                    parent_port
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_port)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
                                continue; // poll some more
 
                            }
 
                        }
 
                    }
 
                },
 
            };
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(s) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    println!("WASNT EXPECTING {:?}", s);
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with port {:?}",
 
                        received.recipient
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Failure => match self.inner.family.parent_port {
 
                    Some(parent_port) => {
 
                        let announcement = Msg::CommMsg(CommMsg {
 
                            round_index: self.inner.round_index,
 
                            contents: CommMsgContents::Failure,
 
                        });
 
                        log!(
 
                            &mut self.inner.logger,
 
                            "Forwarding {:?} to parent with port {:?}",
 
                            &announcement,
 
                            parent_port
 
                        );
 
                        self.inner
 
                            .endpoint_exts
 
                            .get_mut(parent_port)
 
                            .expect("ss")
 
                            .endpoint
 
                            .send(announcement.clone())?;
 
                    }
 
                    None => return self.end_round_with_decision(Decision::Failure),
 
                },
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ports.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { decision } => {
 
                    if self.inner.family.parent_port != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &decision
 
                    );
 
                    return self.end_round_with_decision(decision);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // check that we expect to be able to receive payloads from this sender
 
                    assert_eq!(
 
                        Getter,
 
                        self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
                    );
 
//         // 6. Kick off the synchronous round of each protocol actor
 
//         //    If just one actor becomes inconsistent now, there can be no solution!
 
//         //    TODO distinguish between completed and not completed poly_p's?
 
//         log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
//         for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
//             let my_subtree_id = SubtreeId::PolyP { index };
 
//             let m_ctx = PolyPContext {
 
//                 my_subtree_id,
 
//                 inner: &mut self.inner,
 
//                 solution_storage: &mut self.ephemeral.solution_storage,
 
//             };
 
//             use SyncRunResult as Srr;
 
//             let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
//             log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
//             match blocker {
 
//                 Srr::NoBranches => return Err(SyncError::Inconsistent),
 
//                 Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
//             }
 
//         }
 
//         log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = port_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    let channel_id = self
 
                        .inner
 
                        .endpoint_exts
 
                        .get(received.recipient)
 
                        .expect("UEHFU")
 
                        .info
 
                        .channel_id;
 
                    if payload_predicate.query(channel_id) != Some(true) {
 
                        // sender didn't preserve the invariant
 
                        return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                    }
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 
//         // 7. `solution_storage` may have new solutions for this controller
 
//         //    handle their discovery. LEADER => announce, otherwise => send to parent
 
//         {
 
//             let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 "Got {} controller-local solutions before a single RECV: {:?}",
 
//                 peeked.len(),
 
//                 peeked
 
//             );
 
//         }
 
//         if self.handle_locals_maybe_decide()? {
 
//             return Ok(());
 
//         }
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
 
                                solution_storage: &mut self.ephemeral.solution_storage,
 
                            };
 
                            use SyncRunResult as Srr;
 
                            let blocker = poly_p.poly_recv_run(
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
 
                                payload_predicate,
 
                                payload,
 
                            )?;
 
                            log!(
 
                                &mut self.inner.logger,
 
                                "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                subtree_id,
 
                                blocker
 
                            );
 
                            match blocker {
 
                                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.mono_ps.is_empty()
 
            && self.port_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.port_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                    blocking_on: None,
 
                }
 
            },
 
            ports: self.ports,
 
        }
 
    }
 
}
 
//         // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
//         log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
//         self.undelay_all();
 
//         'recv_loop: loop {
 
//             log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
//             let received = match deadline {
 
//                 None => {
 
//                     // we have personally timed out. perform a "long" poll.
 
//                     self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
//                 }
 
//                 Some(d) => match self.recv(d)? {
 
//                     // we have not yet timed out. performed a time-limited poll
 
//                     Some(received) => received,
 
//                     None => {
 
//                         // timed out! send a FAILURE message to the sink,
 
//                         // and henceforth don't time out on polling.
 
//                         deadline = None;
 
//                         match self.inner.family.parent_port {
 
//                             None => {
 
//                                 // I am the sink! announce failure and return.
 
//                                 return self.end_round_with_decision(Decision::Failure);
 
//                             }
 
//                             Some(parent_port) => {
 
//                                 // I am not the sink! send a failure message.
 
//                                 let announcement = Msg::CommMsg(CommMsg {
 
//                                     round_index: self.inner.round_index,
 
//                                     contents: CommMsgContents::Failure,
 
//                                 });
 
//                                 log!(
 
//                                     &mut self.inner.logger,
 
//                                     "Forwarding {:?} to parent with port {:?}",
 
//                                     &announcement,
 
//                                     parent_port
 
//                                 );
 
//                                 self.inner
 
//                                     .endpoint_exts
 
//                                     .get_mut(parent_port)
 
//                                     .expect("ss")
 
//                                     .endpoint
 
//                                     .send(announcement.clone())?;
 
//                                 continue; // poll some more
 
//                             }
 
//                         }
 
//                     }
 
//                 },
 
//             };
 
//             log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
//             let current_content = match received.msg {
 
//                 Msg::SetupMsg(s) => {
 
//                     // This occurs in the event the connector was malformed during connect()
 
//                     println!("WASNT EXPECTING {:?}", s);
 
//                     return Err(SyncError::UnexpectedSetupMsg);
 
//                 }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index < self.inner.round_index =>
 
//                 {
 
//                     // Old message! Can safely discard
 
//                     log!(&mut self.inner.logger, "...and its OLD! :(");
 
//                     drop(received);
 
//                     continue 'recv_loop;
 
//                 }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index > self.inner.round_index =>
 
//                 {
 
//                     // Message from a next round. Keep for later!
 
//                     log!(&mut self.inner.logger, "... DELAY! :(");
 
//                     self.delay(received);
 
//                     continue 'recv_loop;
 
//                 }
 
//                 Msg::CommMsg(CommMsg { contents, round_index }) => {
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         "... its a round-appropriate CommMsg with port {:?}",
 
//                         received.recipient
 
//                     );
 
//                     assert_eq!(round_index, self.inner.round_index);
 
//                     contents
 
//                 }
 
//             };
 
//             match current_content {
 
//                 CommMsgContents::Failure => match self.inner.family.parent_port {
 
//                     Some(parent_port) => {
 
//                         let announcement = Msg::CommMsg(CommMsg {
 
//                             round_index: self.inner.round_index,
 
//                             contents: CommMsgContents::Failure,
 
//                         });
 
//                         log!(
 
//                             &mut self.inner.logger,
 
//                             "Forwarding {:?} to parent with port {:?}",
 
//                             &announcement,
 
//                             parent_port
 
//                         );
 
//                         self.inner
 
//                             .endpoint_exts
 
//                             .get_mut(parent_port)
 
//                             .expect("ss")
 
//                             .endpoint
 
//                             .send(announcement.clone())?;
 
//                     }
 
//                     None => return self.end_round_with_decision(Decision::Failure),
 
//                 },
 
//                 CommMsgContents::Elaborate { partial_oracle } => {
 
//                     // Child controller submitted a subtree solution.
 
//                     if !self.inner.family.children_ports.contains(&received.recipient) {
 
//                         return Err(SyncError::ElaborateFromNonChild);
 
//                     }
 
//                     let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         "Received elaboration from child for subtree {:?}: {:?}",
 
//                         subtree_id,
 
//                         &partial_oracle
 
//                     );
 
//                     self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                         &mut self.inner.logger,
 
//                         subtree_id,
 
//                         partial_oracle,
 
//                     );
 
//                     if self.handle_locals_maybe_decide()? {
 
//                         return Ok(());
 
//                     }
 
//                 }
 
//                 CommMsgContents::Announce { decision } => {
 
//                     if self.inner.family.parent_port != Some(received.recipient) {
 
//                         return Err(SyncError::AnnounceFromNonParent);
 
//                     }
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
//                         received.recipient,
 
//                         &decision
 
//                     );
 
//                     return self.end_round_with_decision(decision);
 
//                 }
 
//                 CommMsgContents::SendPayload { payload_predicate, payload } => {
 
//                     // check that we expect to be able to receive payloads from this sender
 
//                     assert_eq!(
 
//                         Getter,
 
//                         self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
//                     );
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 
//                     // message for some actor. Feed it to the appropriate actor
 
//                     // and then give them another chance to run.
 
//                     let subtree_id = port_to_holder.get(&received.recipient);
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
//                         subtree_id,
 
//                         &payload_predicate,
 
//                         &payload
 
//                     );
 
//                     let channel_id = self
 
//                         .inner
 
//                         .endpoint_exts
 
//                         .get(received.recipient)
 
//                         .expect("UEHFU")
 
//                         .info
 
//                         .channel_id;
 
//                     if payload_predicate.query(channel_id) != Some(true) {
 
//                         // sender didn't preserve the invariant
 
//                         return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id));
 
//                     }
 
//                     match subtree_id {
 
//                         None => {
 
//                             // this happens when a message is sent to a component that has exited.
 
//                             // It's safe to drop this message;
 
//                             // The sender branch will certainly not be part of the solution
 
//                         }
 
//                         Some(PolyId::N) => {
 
//                             // Message for NativeMachine
 
//                             self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
//                                 received.recipient,
 
//                                 &mut self.inner.logger,
 
//                                 payload,
 
//                                 payload_predicate,
 
//                                 &mut self.ephemeral.solution_storage,
 
//                             );
 
//                             if self.handle_locals_maybe_decide()? {
 
//                                 return Ok(());
 
//                             }
 
//                         }
 
//                         Some(PolyId::P { index }) => {
 
//                             // Message for protocol actor
 
//                             let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ports {:?}!",
 
            &moved_ports,
 
        );
 
        if moved_ports.is_subset(self.ports) {
 
            self.ports.retain(|x| !moved_ports.contains(x));
 
            self.mono_ps.push(MonoP { state: init_state, ports: moved_ports });
 
        } else {
 
            panic!("MachineP attempting to move alien port!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [PortId; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 
//                             let m_ctx = PolyPContext {
 
//                                 my_subtree_id: SubtreeId::PolyP { index: *index },
 
//                                 inner: &mut self.inner,
 
//                                 solution_storage: &mut self.ephemeral.solution_storage,
 
//                             };
 
//                             use SyncRunResult as Srr;
 
//                             let blocker = poly_p.poly_recv_run(
 
//                                 m_ctx,
 
//                                 &self.protocol_description,
 
//                                 received.recipient,
 
//                                 payload_predicate,
 
//                                 payload,
 
//                             )?;
 
//                             log!(
 
//                                 &mut self.inner.logger,
 
//                                 "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
//                                 subtree_id,
 
//                                 blocker
 
//                             );
 
//                             match blocker {
 
//                                 Srr::NoBranches => return Err(SyncError::Inconsistent),
 
//                                 Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
//                                     {
 
//                                         let peeked = self
 
//                                             .ephemeral
 
//                                             .solution_storage
 
//                                             .peek_new_locals()
 
//                                             .collect::<Vec<_>>();
 
//                                         log!(
 
//                                             &mut self.inner.logger,
 
//                                             "Got {} new controller-local solutions from RECV: {:?}",
 
//                                             peeked.len(),
 
//                                             peeked
 
//                                         );
 
//                                     }
 
//                                     if self.handle_locals_maybe_decide()? {
 
//                                         return Ok(());
 
//                                     }
 
//                                 }
 
//                             }
 
//                         }
 
//                     };
 
//                 }
 
//             }
 
//         }
 
//     }
 
// }
 
// impl ControllerEphemeral {
 
//     fn is_clear(&self) -> bool {
 
//         self.solution_storage.is_clear()
 
//             && self.poly_n.is_none()
 
//             && self.poly_ps.is_empty()
 
//             && self.mono_ps.is_empty()
 
//             && self.port_to_holder.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.solution_storage.clear();
 
//         self.poly_n.take();
 
//         self.poly_ps.clear();
 
//         self.port_to_holder.clear();
 
//     }
 
// }
 
// impl Into<PolyP> for MonoP {
 
//     fn into(self) -> PolyP {
 
//         PolyP {
 
//             complete: Default::default(),
 
//             incomplete: hashmap! {
 
//                 Predicate::new_trivial() =>
 
//                 BranchP {
 
//                     state: self.state,
 
//                     inbox: Default::default(),
 
//                     outbox: Default::default(),
 
//                     blocking_on: None,
 
//                 }
 
//             },
 
//             ports: self.ports,
 
//         }
 
//     }
 
// }
 

	
 
        let mut clos = |endpoint, polarity| {
 
            let endpoint_ext =
 
                EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint };
 
            let port = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint;
 
            let token = PortId::to_token(port);
 
            self.inner
 
                .messenger_state
 
                .poll
 
                .register(endpoint, token, Ready::readable(), PollOpt::edge())
 
                .expect("AAGAGGGGG");
 
            self.ports.insert(port);
 
            port
 
        };
 
        let [kp, kg] = [clos(a, Putter), clos(b, Getter)];
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ports {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
 
// impl From<EndpointError> for SyncError {
 
//     fn from(e: EndpointError) -> SyncError {
 
//         SyncError::EndpointError(e)
 
//     }
 
// }
 

	
 
impl SolutionStorage {
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
// impl ProtoSyncContext<'_> {
 
//     fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
//         todo!()
 
//     }
 
//     fn new_channel(&mut self) -> [PortId; 2] {
 
//         todo!()
 
//     }
 
// }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 
// impl SolutionStorage {
 
//     fn is_clear(&self) -> bool {
 
//         self.subtree_id_to_index.is_empty()
 
//             && self.subtree_solutions.is_empty()
 
//             && self.old_local.is_empty()
 
//             && self.new_local.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.subtree_id_to_index.clear();
 
//         self.subtree_solutions.clear();
 
//         self.old_local.clear();
 
//         self.new_local.clear();
 
//     }
 
//     pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
//         self.subtree_id_to_index.clear();
 
//         self.subtree_solutions.clear();
 
//         self.old_local.clear();
 
//         self.new_local.clear();
 
//         for key in subtree_ids {
 
//             self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
//             self.subtree_solutions.push(Default::default())
 
//         }
 
//     }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
//     pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
//         self.new_local.iter()
 
//     }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut String,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 
//     pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
//         let Self { old_local, new_local, .. } = self;
 
//         new_local.drain().map(move |local| {
 
//             old_local.insert(local.clone());
 
//             local
 
//         })
 
//     }
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 
//     pub(crate) fn submit_and_digest_subtree_solution(
 
//         &mut self,
 
//         logger: &mut String,
 
//         subtree_id: SubtreeId,
 
//         predicate: Predicate,
 
//     ) {
 
//         log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
//         let index = self.subtree_id_to_index[&subtree_id];
 
//         let left = 0..index;
 
//         let right = (index + 1)..self.subtree_solutions.len();
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut String,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 
//         let Self { subtree_solutions, new_local, old_local, .. } = self;
 
//         let was_new = subtree_solutions[index].insert(predicate.clone());
 
//         if was_new {
 
//             let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
//             Self::elaborate_into_new_local_rec(
 
//                 logger,
 
//                 predicate,
 
//                 set_visitor,
 
//                 old_local,
 
//                 new_local,
 
//             );
 
//         }
 
//     }
 

	
 
    fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        assert!(self.ports.contains(&port));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        assert!(self.ports.contains(&port));
 
        let val = self.inbox.get(&port);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
 
//     fn elaborate_into_new_local_rec<'a, 'b>(
 
//         logger: &mut String,
 
//         partial: Predicate,
 
//         mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
//         old_local: &'b HashSet<Predicate>,
 
//         new_local: &'a mut HashSet<Predicate>,
 
//     ) {
 
//         if let Some(set) = set_visitor.next() {
 
//             // incomplete solution. keep traversing
 
//             for pred in set.iter() {
 
//                 if let Some(elaborated) = pred.union_with(&partial) {
 
//                     Self::elaborate_into_new_local_rec(
 
//                         logger,
 
//                         elaborated,
 
//                         set_visitor.clone(),
 
//                         old_local,
 
//                         new_local,
 
//                     )
 
//                 }
 
//             }
 
//         } else {
 
//             // recursive stop condition. `partial` is a local subtree solution
 
//             if !old_local.contains(&partial) {
 
//                 // ... and it hasn't been found before
 
//                 log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
//                 new_local.insert(partial);
 
//             }
 
//         }
 
//     }
 
// }
 
// impl PolyContext for BranchPContext<'_, '_> {
 
//     type D = ProtocolD;
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.inner.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
//     fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.inner.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
// }
src/runtime/error.rs
Show inline comments
 
new file 100644
 
use crate::common::*;
 

	
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
pub enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
pub enum SyncError {
 
    Timeout,
 
}
src/runtime/mod.rs
Show inline comments
 
// #[cfg(feature = "ffi")]
 
// pub mod ffi;
 
mod communication;
 
mod error;
 
mod setup2;
 

	
 
// mod actors;
 
// pub(crate) mod communication;
 
// pub(crate) mod connector;
 
// pub(crate) mod endpoint;
 
// pub mod errors;
 
// mod serde;
 
#[cfg(test)]
 
mod my_tests;
 
mod setup2;
 
// pub(crate) mod setup;
 
// mod v2;
 

	
 
use crate::common::*;
 
// use actors::*;
 
// use endpoint::*;
 
// use errors::*;
 
use error::*;
 

	
 
#[derive(Clone, Copy, Debug)]
 
pub enum LocalComponentId {
 
    Native,
 
    Proto { index: usize },
 
}
 
#[derive(Debug, Clone, Copy)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum Decision {
 
pub enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum Msg {
 
pub enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum SetupMsg {
 
    // sent by the passive endpoint to the active endpoint
 
    // MyPortInfo(MyPortInfo),
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderEcho { maybe_leader: ControllerId },
 
    LeaderAnnounce { leader: ControllerId },
 
    YouAreMyParent,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct CommMsg {
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum CommMsgContents {
 
pub enum CommMsgContents {
 
    SendPayload { payload_predicate: Predicate, payload: Payload },
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
}
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
@@ -78,19 +78,12 @@ pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
pub enum InpRoute {
 
    NativeComponent,
 
    ProtoComponent { index: usize },
 
    Endpoint { index: usize },
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write;
 
    fn dump_log(&self, w: &mut dyn std::io::Write);
 
}
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub polarity: Polarity,
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 
@@ -116,11 +109,17 @@ pub struct EndpointManager {
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: IndexSet<usize>,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug, Default)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    logger: Box<dyn Logger>,
 
@@ -128,8 +127,7 @@ pub struct Connector {
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    proto_components: Vec<ProtoComponent>,
 
    outp_to_inp: HashMap<PortId, PortId>,
 
    inp_to_route: HashMap<PortId, InpRoute>,
 
    port_info: PortInfo,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
@@ -142,77 +140,147 @@ pub enum ConnectorPhased {
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
        native_actor: NativeActor, // sync invariant: in Nonsync state
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct Predicate {
 
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
}
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<PortId, Payload>,
 
    gets: HashSet<PortId>,
 
}
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
pub enum EndpointRecvErr {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
pub struct SyncContext<'a> {
 
pub struct SyncProtoContext<'a> {
 
    connector: &'a mut Connector,
 
    proto_component_index: usize,
 
}
 
pub struct NonsyncContext<'a> {
 
pub struct NonsyncProtoContext<'a> {
 
    connector: &'a mut Connector,
 
    proto_component_index: usize,
 
}
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointRecvErr { error: EndpointRecvErr, index: usize },
 
    BrokenEndpoint(usize),
 

	
 
// pub struct MonoPContext<'a> {
 
//     inner: &'a mut ControllerInner,
 
//     ports: &'a mut HashSet<PortId>,
 
//     mono_ps: &'a mut Vec<MonoP>,
 
// }
 
// pub struct PolyPContext<'a> {
 
//     my_subtree_id: SubtreeId,
 
//     inner: &'a mut Connector,
 
//     solution_storage: &'a mut SolutionStorage,
 
// }
 
// impl PolyPContext<'_> {
 
//     #[inline(always)]
 
//     fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
//         let Self { solution_storage, my_subtree_id, inner } = self;
 
//         PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
//     }
 
// }
 
// struct BranchPContext<'m, 'r> {
 
//     m_ctx: PolyPContext<'m>,
 
//     ports: &'r HashSet<PortId>,
 
//     predicate: &'r Predicate,
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
#[derive(Default)]
 
pub struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
pub enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { port: PortId },
 
}
 
#[derive(Debug, Default)]
 
pub struct SyncBatch {
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
pub enum NativeActor {
 
    Nonsync {
 
        sync_result_branch: Option<NativeBranch>, // invariant: sync_result_branch.to_get.is_empty()
 
        next_batches: Vec<SyncBatch>,             // invariant: nonempty
 
    },
 
    Sync {
 
        branches: HashMap<Predicate, NativeBranch>,
 
    },
 
}
 
#[derive(Debug)]
 
pub struct NativeBranch {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
////////////////
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
    fn try_recv_any(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        // 2. try read from sockets nonblocking
 
        while let Some(index) = self.undrained_endpoints.pop() {
 
            if let Some(msg) = self.endpoint_exts[index]
 
                .endpoint
 
                .try_recv()
 
                .map_err(|error| EndpointRecvErr { error, index })?
 
            {
 
                return Ok((index, msg));
 
            }
 
        }
 
        // 3. poll for progress
 

	
 
        loop {
 
            // 2. try read a message from an enpoint that previously raised an event
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) =
 
                    endpoint.try_recv().map_err(|error| EndpointError { error, index })?
 
                {
 
                    if !endpoint.inbox.is_empty() {
 
                        // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    }
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. poll!
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                log!(logger, "Poll event {:?}", event);
 
                let Token(index) = event.token();
 
                if let Some(msg) = self.endpoint_exts[index]
 
                    .endpoint
 
                    .try_recv()
 
                    .map_err(|error| EndpointRecvErr { error, index })?
 
                {
 
                    return Ok((index, msg));
 
                }
 
                self.polled_undrained.insert(index);
 
            }
 
        }
 
    }
 
    fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
@@ -221,22 +289,6 @@ impl Debug for Endpoint {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl NonsyncContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) {
 
        todo!()
 
    }
 
    pub fn new_channel(&mut self) -> [PortId; 2] {
 
        todo!()
 
    }
 
}
 
impl SyncContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        todo!()
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        todo!()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
@@ -259,15 +311,6 @@ impl Into<Msg> for SetupMsg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("{")?;
 
        for (port, &v) in self.assigned.iter() {
 
            f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))?
 
        }
 
        f.pad("}")
 
    }
 
}
 
impl StringLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, String::default())
 
@@ -308,8 +351,8 @@ impl IdManager {
 
    }
 
}
 
impl Endpoint {
 
    fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointRecvErr> {
 
        use EndpointRecvErr::*;
 
    fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            match self.stream.read_to_end(&mut self.inbox) {
 
@@ -357,518 +400,133 @@ impl Connector {
 
        writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
        for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
            let sols = &self.subtree_solutions[index];
 
            f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
        }
 
        f.pad("]")
 
    }
 
}
 

	
 
// #[derive(Debug)]
 
// pub enum Connector {
 
//     Unconfigured(Unconfigured),
 
//     Configured(Configured),
 
//     Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space
 
// }
 
// #[derive(Debug)]
 
// pub struct Unconfigured {
 
//     pub controller_id: ControllerId,
 
// }
 
// #[derive(Debug)]
 
// pub struct Configured {
 
//     controller_id: ControllerId,
 
//     polarities: Vec<Polarity>,
 
//     bindings: HashMap<usize, PortBinding>,
 
//     protocol_description: Arc<ProtocolD>,
 
//     main_component: Vec<u8>,
 
//     logger: String,
 
// }
 
// #[derive(Debug)]
 
// pub struct Connected {
 
//     native_interface: Vec<(PortId, Polarity)>,
 
//     sync_batches: Vec<SyncBatch>,
 
//     // controller is cooperatively scheduled with the native application
 
//     // (except for transport layer behind Endpoints, which are managed by the OS)
 
//     // control flow is passed to the controller during methods on Connector (primarily, connect and sync).
 
//     controller: Controller,
 
// }
 

	
 
// #[derive(Debug, Copy, Clone)]
 
// pub enum PortBinding {
 
//     Native,
 
//     Active(SocketAddr),
 
//     Passive(SocketAddr),
 
// }
 

	
 
// #[derive(Debug)]
 
// struct Arena<T> {
 
//     storage: Vec<T>,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct ReceivedMsg {
 
//     recipient: PortId,
 
//     msg: Msg,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct MessengerState {
 
//     poll: Poll,
 
//     events: Events,
 
//     delayed: Vec<ReceivedMsg>,
 
//     undelayed: Vec<ReceivedMsg>,
 
//     polled_undrained: IndexSet<PortId>,
 
// }
 
// #[derive(Debug)]
 
// struct ChannelIdStream {
 
//     controller_id: ControllerId,
 
//     next_channel_index: ChannelIndex,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct Controller {
 
//     protocol_description: Arc<ProtocolD>,
 
//     inner: ControllerInner,
 
//     ephemeral: ControllerEphemeral,
 
//     unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
// }
 
// #[derive(Debug)]
 
// struct ControllerInner {
 
//     round_index: usize,
 
//     channel_id_stream: ChannelIdStream,
 
//     endpoint_exts: Arena<EndpointExt>,
 
//     messenger_state: MessengerState,
 
//     mono_n: MonoN,       // state at next round start
 
//     mono_ps: Vec<MonoP>, // state at next round start
 
//     family: ControllerFamily,
 
//     logger: String,
 
// }
 

	
 
// /// This structure has its state entirely reset between synchronous rounds
 
// #[derive(Debug, Default)]
 
// struct ControllerEphemeral {
 
//     solution_storage: SolutionStorage,
 
//     poly_n: Option<PolyN>,
 
//     poly_ps: Vec<PolyP>,
 
//     mono_ps: Vec<MonoP>,
 
//     port_to_holder: HashMap<PortId, PolyId>,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct ControllerFamily {
 
//     parent_port: Option<PortId>,
 
//     children_ports: Vec<PortId>,
 
// }
 

	
 
// #[derive(Debug)]
 
// pub(crate) enum SyncRunResult {
 
//     BlockingForRecv,
 
//     AllBranchesComplete,
 
//     NoBranches,
 
// }
 

	
 
// // Used to identify poly actors
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// enum PolyId {
 
//     N,
 
//     P { index: usize },
 
// }
 

	
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub(crate) enum SubtreeId {
 
//     PolyN,
 
//     PolyP { index: usize },
 
//     ChildController { port: PortId },
 
// }
 

	
 
// pub(crate) struct MonoPContext<'a> {
 
//     inner: &'a mut ControllerInner,
 
//     ports: &'a mut HashSet<PortId>,
 
//     mono_ps: &'a mut Vec<MonoP>,
 
// }
 
// pub(crate) struct PolyPContext<'a> {
 
//     my_subtree_id: SubtreeId,
 
//     inner: &'a mut ControllerInner,
 
//     solution_storage: &'a mut SolutionStorage,
 
// }
 
// impl PolyPContext<'_> {
 
//     #[inline(always)]
 
//     fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
//         let Self { solution_storage, my_subtree_id, inner } = self;
 
//         PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
//     }
 
// }
 
// struct BranchPContext<'m, 'r> {
 
//     m_ctx: PolyPContext<'m>,
 
//     ports: &'r HashSet<PortId>,
 
//     predicate: &'r Predicate,
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
// #[derive(Default)]
 
// pub(crate) struct SolutionStorage {
 
//     old_local: HashSet<Predicate>,
 
//     new_local: HashSet<Predicate>,
 
//     // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
//     subtree_solutions: Vec<HashSet<Predicate>>,
 
//     subtree_id_to_index: HashMap<SubtreeId, usize>,
 
// }
 

	
 
// trait Messengerlike {
 
//     fn get_state_mut(&mut self) -> &mut MessengerState;
 
//     fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint;
 

	
 
//     fn delay(&mut self, received: ReceivedMsg) {
 
//         self.get_state_mut().delayed.push(received);
 
//     }
 
//     fn undelay_all(&mut self) {
 
//         let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
//         undelayed.extend(delayed.drain(..))
 
//     }
 

	
 
//     fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> {
 
//         self.get_endpoint_mut(to).send(msg)
 
//     }
 

	
 
//     // attempt to receive a message from one of the endpoints before the deadline
 
//     fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
//         // try get something buffered
 
//         if let Some(x) = self.get_state_mut().undelayed.pop() {
 
//             return Ok(Some(x));
 
//         }
 

	
 
//         loop {
 
//             // polled_undrained may not be empty
 
//             while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
//                 if let Some(msg) = self
 
//                     .get_endpoint_mut(eport)
 
//                     .recv()
 
//                     .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
//                 {
 
//                     // this endpoint MAY still have messages! check again in future
 
//                     self.get_state_mut().polled_undrained.insert(eport);
 
//                     return Ok(Some(ReceivedMsg { recipient: eport, msg }));
 
//                 }
 
//             }
 

	
 
//             let state = self.get_state_mut();
 
//             match state.poll_events(deadline) {
 
//                 Ok(()) => {
 
//                     for e in state.events.iter() {
 
//                         state.polled_undrained.insert(PortId::from_token(e.token()));
 
//                     }
 
//                 }
 
//                 Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
//                 Err(PollDeadlineErr::Timeout) => return Ok(None),
 
//             }
 
//         }
 
//     }
 
//     fn recv_blocking(&mut self) -> Result<ReceivedMsg, MessengerRecvErr> {
 
//         // try get something buffered
 
//         if let Some(x) = self.get_state_mut().undelayed.pop() {
 
//             return Ok(x);
 
//         }
 

	
 
//         loop {
 
//             // polled_undrained may not be empty
 
//             while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
//                 if let Some(msg) = self
 
//                     .get_endpoint_mut(eport)
 
//                     .recv()
 
//                     .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
//                 {
 
//                     // this endpoint MAY still have messages! check again in future
 
//                     self.get_state_mut().polled_undrained.insert(eport);
 
//                     return Ok(ReceivedMsg { recipient: eport, msg });
 
//                 }
 
//             }
 

	
 
//             let state = self.get_state_mut();
 

	
 
//             state
 
//                 .poll
 
//                 .poll(&mut state.events, None)
 
//                 .map_err(|_| MessengerRecvErr::PollingFailed)?;
 
//             for e in state.events.iter() {
 
//                 state.polled_undrained.insert(PortId::from_token(e.token()));
 
//             }
 
//         }
 
//     }
 
// }
 

	
 
// /////////////////////////////////
 
// impl Debug for SolutionStorage {
 
//     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
//         f.pad("Solutions: [")?;
 
//         for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
//             let sols = &self.subtree_solutions[index];
 
//             f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
//         }
 
//         f.pad("]")
 
//     }
 
// }
 
// impl From<EvalErr> for SyncErr {
 
//     fn from(e: EvalErr) -> SyncErr {
 
//         SyncErr::EvalErr(e)
 
//     }
 
// }
 
// impl From<MessengerRecvErr> for SyncErr {
 
//     fn from(e: MessengerRecvErr) -> SyncErr {
 
//         SyncErr::MessengerRecvErr(e)
 
//     }
 
// }
 
// impl From<MessengerRecvErr> for ConnectErr {
 
//     fn from(e: MessengerRecvErr) -> ConnectErr {
 
//         ConnectErr::MessengerRecvErr(e)
 
//     }
 
// }
 
// impl<T> Default for Arena<T> {
 
//     fn default() -> Self {
 
//         Self { storage: vec![] }
 
//     }
 
// }
 
// impl<T> Arena<T> {
 
//     pub fn alloc(&mut self, t: T) -> PortId {
 
//         self.storage.push(t);
 
//         let l: u32 = self.storage.len().try_into().unwrap();
 
//         PortId::from_raw(l - 1u32)
 
//     }
 
//     pub fn get(&self, key: PortId) -> Option<&T> {
 
//         self.storage.get(key.to_raw() as usize)
 
//     }
 
//     pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> {
 
//         self.storage.get_mut(key.to_raw() as usize)
 
//     }
 
//     pub fn type_convert<X>(self, f: impl FnMut((PortId, T)) -> X) -> Arena<X> {
 
//         Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
//     }
 
//     pub fn iter(&self) -> impl Iterator<Item = (PortId, &T)> {
 
//         self.keyspace().zip(self.storage.iter())
 
//     }
 
//     pub fn len(&self) -> usize {
 
//         self.storage.len()
 
//     }
 
//     pub fn keyspace(&self) -> impl Iterator<Item = PortId> {
 
//         (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw)
 
//     }
 
// }
 

	
 
// impl ChannelIdStream {
 
//     fn new(controller_id: ControllerId) -> Self {
 
//         Self { controller_id, next_channel_index: 0 }
 
//     }
 
//     fn next(&mut self) -> ChannelId {
 
//         self.next_channel_index += 1;
 
//         ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
//     }
 
// }
 

	
 
// impl MessengerState {
 
//     // does NOT guarantee that events is non-empty
 
//     fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
//         use PollDeadlineErr::*;
 
//         self.events.clear();
 
//         let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
//         self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
//         Ok(())
 
//     }
 
// }
 
// impl From<PollDeadlineErr> for ConnectErr {
 
//     fn from(e: PollDeadlineErr) -> ConnectErr {
 
//         match e {
 
//             PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
//             PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
 
//         }
 
//     }
 
// }
 

	
 
// impl std::ops::Not for Polarity {
 
//     type Output = Self;
 
//     fn not(self) -> Self::Output {
 
//         use Polarity::*;
 
//         match self {
 
//             Putter => Getter,
 
//             Getter => Putter,
 
//         }
 
//     }
 
// }
 

	
 
// impl Predicate {
 
//     // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
//     pub fn satisfies(&self, other: &Self) -> bool {
 
//         let mut s_it = self.assigned.iter();
 
//         let mut s = if let Some(s) = s_it.next() {
 
//             s
 
//         } else {
 
//             return other.assigned.is_empty();
 
//         };
 
//         for (oid, ob) in other.assigned.iter() {
 
//             while s.0 < oid {
 
//                 s = if let Some(s) = s_it.next() {
 
//                     s
 
//                 } else {
 
//                     return false;
 
//                 };
 
//             }
 
//             if s.0 > oid || s.1 != ob {
 
//                 return false;
 
//             }
 
//         }
 
//         true
 
//     }
 

	
 
//     /// Given self and other, two predicates, return the most general Predicate possible, N
 
//     /// such that n.satisfies(self) && n.satisfies(other).
 
//     /// If none exists Nonexistant is returned.
 
//     /// If the resulting predicate is equivlanet to self, other, or both,
 
//     /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
//     /// otherwise New(N) is returned.
 
//     pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
//         use CommonSatResult::*;
 
//         // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
//         let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
//         let [mut s, mut o] = [s_it.next(), o_it.next()];
 
//         // lists of assignments in self but not other and vice versa.
 
//         let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
//         loop {
 
//             match [s, o] {
 
//                 [None, None] => break,
 
//                 [None, Some(x)] => {
 
//                     o_not_s.push(x);
 
//                     o_not_s.extend(o_it);
 
//                     break;
 
//                 }
 
//                 [Some(x), None] => {
 
//                     s_not_o.push(x);
 
//                     s_not_o.extend(s_it);
 
//                     break;
 
//                 }
 
//                 [Some((sid, sb)), Some((oid, ob))] => {
 
//                     if sid < oid {
 
//                         // o is missing this element
 
//                         s_not_o.push((sid, sb));
 
//                         s = s_it.next();
 
//                     } else if sid > oid {
 
//                         // s is missing this element
 
//                         o_not_s.push((oid, ob));
 
//                         o = o_it.next();
 
//                     } else if sb != ob {
 
//                         assert_eq!(sid, oid);
 
//                         // both predicates assign the variable but differ on the value
 
//                         return Nonexistant;
 
//                     } else {
 
//                         // both predicates assign the variable to the same value
 
//                         s = s_it.next();
 
//                         o = o_it.next();
 
//                     }
 
//                 }
 
//             }
 
//         }
 
//         // Observed zero inconsistencies. A unified predicate exists...
 
//         match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
//             [true, true] => Equivalent,       // ... equivalent to both.
 
//             [false, true] => FormerNotLatter, // ... equivalent to self.
 
//             [true, false] => LatterNotFormer, // ... equivalent to other.
 
//             [false, false] => {
 
//                 // ... which is the union of the predicates' assignments but
 
//                 //     is equivalent to neither self nor other.
 
//                 let mut new = self.clone();
 
//                 for (&id, &b) in o_not_s {
 
//                     new.assigned.insert(id, b);
 
//                 }
 
//                 New(new)
 
//             }
 
//         }
 
//     }
 

	
 
//     pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
//         self.assigned
 
//             .iter()
 
//             .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
//     }
 

	
 
//     pub fn batch_assign_nones(
 
//         &mut self,
 
//         channel_ids: impl Iterator<Item = ChannelId>,
 
//         value: bool,
 
//     ) {
 
//         for channel_id in channel_ids {
 
//             self.assigned.entry(channel_id).or_insert(value);
 
//         }
 
//     }
 
//     pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
//         self.assigned.insert(channel_id, value)
 
//     }
 
//     pub fn union_with(&self, other: &Self) -> Option<Self> {
 
//         let mut res = self.clone();
 
//         for (&channel_id, &assignment_1) in other.assigned.iter() {
 
//             match res.assigned.insert(channel_id, assignment_1) {
 
//                 Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
//                 _ => {}
 
//             }
 
//         }
 
//         Some(res)
 
//     }
 
//     pub fn query(&self, x: ChannelId) -> Option<bool> {
 
//         self.assigned.get(&x).copied()
 
//     }
 
//     pub fn new_trivial() -> Self {
 
//         Self { assigned: Default::default() }
 
//     }
 
// }
 

	
 
// #[test]
 
// fn pred_sat() {
 
//     use maplit::btreemap;
 
//     let mut c = ChannelIdStream::new(0);
 
//     let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
//     let p = Predicate::new_trivial();
 
//     let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
//     let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
//     let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
//     let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
//     assert!(p.satisfies(&p));
 
//     assert!(p_0t.satisfies(&p_0t));
 
//     assert!(p_0f.satisfies(&p_0f));
 
//     assert!(p_0f_3f.satisfies(&p_0f_3f));
 
//     assert!(p_0f_3t.satisfies(&p_0f_3t));
 

	
 
//     assert!(p_0t.satisfies(&p));
 
//     assert!(p_0f.satisfies(&p));
 
//     assert!(p_0f_3f.satisfies(&p_0f));
 
//     assert!(p_0f_3t.satisfies(&p_0f));
 

	
 
//     assert!(!p.satisfies(&p_0t));
 
//     assert!(!p.satisfies(&p_0f));
 
//     assert!(!p_0f.satisfies(&p_0t));
 
//     assert!(!p_0t.satisfies(&p_0f));
 
//     assert!(!p_0f_3f.satisfies(&p_0f_3t));
 
//     assert!(!p_0f_3t.satisfies(&p_0f_3f));
 
//     assert!(!p_0t.satisfies(&p_0f_3f));
 
//     assert!(!p_0f.satisfies(&p_0f_3f));
 
//     assert!(!p_0t.satisfies(&p_0f_3t));
 
//     assert!(!p_0f.satisfies(&p_0f_3t));
 
// }
 

	
 
// #[test]
 
// fn pred_common_sat() {
 
//     use maplit::btreemap;
 
//     use CommonSatResult::*;
 

	
 
//     let mut c = ChannelIdStream::new(0);
 
//     let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
//     let p = Predicate::new_trivial();
 
//     let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
//     let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
//     let p_3f = Predicate { assigned: btreemap! { ch[3] => false } };
 
//     let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
//     let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
//     assert_eq![p.common_satisfier(&p), Equivalent];
 
//     assert_eq![p_0t.common_satisfier(&p_0t), Equivalent];
 
impl Predicate {
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
            }
 
        }
 
        true
 
    }
 

	
 
//     assert_eq![p.common_satisfier(&p_0t), LatterNotFormer];
 
//     assert_eq![p_0t.common_satisfier(&p), FormerNotLatter];
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                New(new)
 
            }
 
        }
 
    }
 

	
 
//     assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant];
 
//     assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant];
 
//     assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant];
 
//     assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant];
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = PortId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
//     assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)];
 
// }
 
    pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: PortId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    }
 
    pub fn new_trivial() -> Self {
 
        Self { assigned: Default::default() }
 
    }
 
}
src/runtime/my_tests.rs
Show inline comments
 
@@ -45,9 +45,8 @@ fn add_sync() {
 
fn add_net_port() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
@@ -62,12 +61,12 @@ fn trivial_connect() {
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    let res = c.connect(Duration::from_secs(1));
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
    res.unwrap();
 
}
 

	
 
#[test]
 
@@ -76,15 +75,13 @@ fn multithreaded_connect() {
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let es = EndpointSetup { polarity: Getter, sock_addr, is_active: true };
 
            let _ = c.add_net_port(es).unwrap();
 
            let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let es = EndpointSetup { polarity: Putter, sock_addr, is_active: false };
 
            let _ = c.add_net_port(es).unwrap();
 
            let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
src/runtime/retired/actors.rs
Show inline comments
 
file renamed from src/runtime/actors.rs to src/runtime/retired/actors.rs
src/runtime/retired/communication.rs
Show inline comments
 
new file 100644
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let ret = match &decision {
 
            Decision::Success(predicate) => {
 
                // overwrite MonoN/P
 
                self.inner.mono_n = {
 
                    let poly_n = self.ephemeral.poly_n.take().unwrap();
 
                    poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
                        panic!(
 
                            "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
                            &predicate, &poly_n.branches, &self.inner.logger
 
                        );
 
                    })
 
                };
 
                self.inner.mono_ps.clear();
 
                self.inner.mono_ps.extend(
 
                    self.ephemeral
 
                        .poly_ps
 
                        .drain(..)
 
                        .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
                );
 
                Ok(())
 
            }
 
            Decision::Failure => Err(SyncErr::Timeout),
 
        };
 
        let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
        for &child_port in self.inner.family.children_ports.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with port {:?}",
 
                &announcement,
 
                child_port
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_port)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        ret
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_port) = self.inner.family.parent_port {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_port.is_none());
 
            let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(predicate) = maybe_predicate {
 
                let decision = Decision::Success(predicate);
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ports, .. } = self.inner.mono_n.clone();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id;
 
            let all_ports = ports.iter().copied();
 
            let all_channel_ids = all_ports.map(port_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ports = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ports.clone().map(port_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Duplicate predicate {:#?}!\nHaving multiple batches with the same
 
                    predicate requires the support of oracle boolean variables",
 
                    &predicate,
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (port, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ports, branches })
 
    }
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        if let Some(e) = self.unrecoverable_error {
 
            return Err(e.clone());
 
        }
 
        self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
            SyncErr::Timeout => e, // this isn't unrecoverable
 
            _ => {
 
                // Must set unrecoverable error! and tear down our net channels
 
                self.unrecoverable_error = Some(e);
 
                self.ephemeral.clear();
 
                self.inner.endpoint_exts = Default::default();
 
                e
 
            }
 
        })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    fn sync_round_inner(
 
        &mut self,
 
        mut deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 
        assert!(self.ephemeral.is_clear());
 
        assert!(self.unrecoverable_error.is_none());
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ports: &mut mono_p.ports,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from port -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let port_to_holder: HashMap<PortId, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
            &port_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = std::iter::once(SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ports
 
                .iter()
 
                .map(|&port| SubtreeId::ChildController { port });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
            let received = match deadline {
 
                None => {
 
                    // we have personally timed out. perform a "long" poll.
 
                    self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
                }
 
                Some(d) => match self.recv(d)? {
 
                    // we have not yet timed out. performed a time-limited poll
 
                    Some(received) => received,
 
                    None => {
 
                        // timed out! send a FAILURE message to the sink,
 
                        // and henceforth don't time out on polling.
 
                        deadline = None;
 
                        match self.inner.family.parent_port {
 
                            None => {
 
                                // I am the sink! announce failure and return.
 
                                return self.end_round_with_decision(Decision::Failure);
 
                            }
 
                            Some(parent_port) => {
 
                                // I am not the sink! send a failure message.
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
                                    contents: CommMsgContents::Failure,
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with port {:?}",
 
                                    &announcement,
 
                                    parent_port
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_port)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
                                continue; // poll some more
 
                            }
 
                        }
 
                    }
 
                },
 
            };
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(s) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    println!("WASNT EXPECTING {:?}", s);
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with port {:?}",
 
                        received.recipient
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Failure => match self.inner.family.parent_port {
 
                    Some(parent_port) => {
 
                        let announcement = Msg::CommMsg(CommMsg {
 
                            round_index: self.inner.round_index,
 
                            contents: CommMsgContents::Failure,
 
                        });
 
                        log!(
 
                            &mut self.inner.logger,
 
                            "Forwarding {:?} to parent with port {:?}",
 
                            &announcement,
 
                            parent_port
 
                        );
 
                        self.inner
 
                            .endpoint_exts
 
                            .get_mut(parent_port)
 
                            .expect("ss")
 
                            .endpoint
 
                            .send(announcement.clone())?;
 
                    }
 
                    None => return self.end_round_with_decision(Decision::Failure),
 
                },
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ports.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { decision } => {
 
                    if self.inner.family.parent_port != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &decision
 
                    );
 
                    return self.end_round_with_decision(decision);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // check that we expect to be able to receive payloads from this sender
 
                    assert_eq!(
 
                        Getter,
 
                        self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
                    );
 

	
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = port_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    let channel_id = self
 
                        .inner
 
                        .endpoint_exts
 
                        .get(received.recipient)
 
                        .expect("UEHFU")
 
                        .info
 
                        .channel_id;
 
                    if payload_predicate.query(channel_id) != Some(true) {
 
                        // sender didn't preserve the invariant
 
                        return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                    }
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
 
                                solution_storage: &mut self.ephemeral.solution_storage,
 
                            };
 
                            use SyncRunResult as Srr;
 
                            let blocker = poly_p.poly_recv_run(
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
 
                                payload_predicate,
 
                                payload,
 
                            )?;
 
                            log!(
 
                                &mut self.inner.logger,
 
                                "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                subtree_id,
 
                                blocker
 
                            );
 
                            match blocker {
 
                                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.mono_ps.is_empty()
 
            && self.port_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.port_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                    blocking_on: None,
 
                }
 
            },
 
            ports: self.ports,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ports {:?}!",
 
            &moved_ports,
 
        );
 
        if moved_ports.is_subset(self.ports) {
 
            self.ports.retain(|x| !moved_ports.contains(x));
 
            self.mono_ps.push(MonoP { state: init_state, ports: moved_ports });
 
        } else {
 
            panic!("MachineP attempting to move alien port!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [PortId; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 

	
 
        let mut clos = |endpoint, polarity| {
 
            let endpoint_ext =
 
                EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint };
 
            let port = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint;
 
            let token = PortId::to_token(port);
 
            self.inner
 
                .messenger_state
 
                .poll
 
                .register(endpoint, token, Ready::readable(), PollOpt::edge())
 
                .expect("AAGAGGGGG");
 
            self.ports.insert(port);
 
            port
 
        };
 
        let [kp, kg] = [clos(a, Putter), clos(b, Getter)];
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ports {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
 

	
 
impl SolutionStorage {
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut String,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut String,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        assert!(self.ports.contains(&port));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        assert!(self.ports.contains(&port));
 
        let val = self.inbox.get(&port);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
src/runtime/retired/connector.rs
Show inline comments
 
file renamed from src/runtime/connector.rs to src/runtime/retired/connector.rs
src/runtime/retired/endpoint.rs
Show inline comments
 
file renamed from src/runtime/endpoint.rs to src/runtime/retired/endpoint.rs
src/runtime/retired/errors.rs
Show inline comments
 
file renamed from src/runtime/errors.rs to src/runtime/retired/errors.rs
src/runtime/retired/experimental/api.rs
Show inline comments
 
file renamed from src/runtime/experimental/api.rs to src/runtime/retired/experimental/api.rs
src/runtime/retired/experimental/bits.rs
Show inline comments
 
file renamed from src/runtime/experimental/bits.rs to src/runtime/retired/experimental/bits.rs
src/runtime/retired/experimental/ecs.rs
Show inline comments
 
file renamed from src/runtime/experimental/ecs.rs to src/runtime/retired/experimental/ecs.rs
src/runtime/retired/experimental/mod.rs
Show inline comments
 
file renamed from src/runtime/experimental/mod.rs to src/runtime/retired/experimental/mod.rs
src/runtime/retired/experimental/pdl.rs
Show inline comments
 
file renamed from src/runtime/experimental/pdl.rs to src/runtime/retired/experimental/pdl.rs
src/runtime/retired/experimental/predicate.rs
Show inline comments
 
file renamed from src/runtime/experimental/predicate.rs to src/runtime/retired/experimental/predicate.rs
src/runtime/retired/experimental/vec_storage.rs
Show inline comments
 
file renamed from src/runtime/experimental/vec_storage.rs to src/runtime/retired/experimental/vec_storage.rs
src/runtime/retired/ffi.rs
Show inline comments
 
file renamed from src/runtime/ffi.rs to src/runtime/retired/ffi.rs
src/runtime/retired/serde.rs
Show inline comments
 
file renamed from src/runtime/serde.rs to src/runtime/retired/serde.rs
src/runtime/retired/setup.rs
Show inline comments
 
file renamed from src/runtime/setup.rs to src/runtime/retired/setup.rs
src/runtime/retired/v2.rs
Show inline comments
 
file renamed from src/runtime/v2.rs to src/runtime/retired/v2.rs
src/runtime/setup2.rs
Show inline comments
 
@@ -29,29 +29,37 @@ impl Connector {
 
            id_manager: IdManager::new(controller_id),
 
            native_ports: Default::default(),
 
            proto_components: Default::default(),
 
            outp_to_inp: Default::default(),
 
            inp_to_route: Default::default(),
 
            port_info: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
 
        let o = self.id_manager.next_port();
 
        let i = self.id_manager.next_port();
 
        self.outp_to_inp.insert(o, i);
 
        self.inp_to_route.insert(i, InpRoute::NativeComponent);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
 
    pub fn add_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        endpoint_setup: EndpointSetup,
 
    ) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.next_port();
 
                self.native_ports.insert(p);
 
                if endpoint_setup.polarity == Getter {
 
                    self.inp_to_route.insert(p, InpRoute::NativeComponent);
 
                }
 
                // {polarity, route} known. {peer} unknown.
 
                self.port_info.polarities.insert(p, polarity);
 
                self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
@@ -59,23 +67,6 @@ impl Connector {
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    fn check_polarity(&self, port: &PortId) -> Polarity {
 
        if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased {
 
            for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() {
 
                if setup_port == port {
 
                    // special case. this port's polarity isn't reflected by
 
                    // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer
 
                    return *polarity;
 
                }
 
            }
 
        }
 
        if self.outp_to_inp.contains_key(port) {
 
            Polarity::Putter
 
        } else {
 
            assert!(self.inp_to_route.contains_key(port));
 
            Polarity::Getter
 
        }
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
@@ -90,7 +81,7 @@ impl Connector {
 
            if !self.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != self.check_polarity(port) {
 
            if expected_polarity != *self.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
@@ -100,9 +91,11 @@ impl Connector {
 
        let proto_component_index = self.proto_components.len();
 
        self.proto_components.push(proto_component);
 
        for port in ports.iter() {
 
            if let Polarity::Getter = self.check_polarity(port) {
 
                self.inp_to_route
 
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
 
            if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() {
 
                self.port_info.routes.insert(
 
                    *port,
 
                    Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }),
 
                );
 
            }
 
        }
 
        Ok(())
 
@@ -118,22 +111,8 @@ impl Connector {
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = {
 
                    let Self { outp_to_inp, inp_to_route, logger, .. } = self;
 
                    let logical_channel_callback = |lci: LogicalChannelInfo| {
 
                        if let Putter = lci.local_polarity {
 
                            outp_to_inp.insert(lci.local_port, lci.peer_port);
 
                            inp_to_route.insert(
 
                                lci.peer_port,
 
                                InpRoute::Endpoint { index: lci.endpoint_index },
 
                            );
 
                        }
 
                    };
 
                    new_endpoint_manager(
 
                        &mut **logger,
 
                        endpoint_setups,
 
                        logical_channel_callback,
 
                        deadline,
 
                    )?
 
                    let Self { logger, port_info, .. } = self;
 
                    new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)?
 
                };
 
                log!(
 
                    self.logger,
 
@@ -153,6 +132,10 @@ impl Connector {
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_actor: NativeActor::Nonsync {
 
                        sync_result_branch: None,
 
                        next_batches: vec![SyncBatch::default()],
 
                    },
 
                };
 
                Ok(())
 
            }
 
@@ -163,14 +146,13 @@ impl Connector {
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    mut logical_channel_callback: impl FnMut(LogicalChannelInfo),
 
    port_info: &mut PortInfo,
 
    deadline: Instant,
 
) -> Result<EndpointManager, ()> {
 
    ////////////////////////////////////////////
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
@@ -194,20 +176,15 @@ fn new_endpoint_manager(
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            endpoint_setup: endpoint_setup.clone(),
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
        })
 
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
 
    };
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut poll = Poll::new().map_err(drop)?;
 
    let mut events = Events::with_capacity(64);
 
    let mut undrained_endpoints = IndexSet::<usize>::default();
 
    let mut polled_undrained = IndexSet::<usize>::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
@@ -234,7 +211,7 @@ fn new_endpoint_manager(
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                poll.registry().deregister(listener).unwrap();
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
            }
 
@@ -242,35 +219,49 @@ fn new_endpoint_manager(
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    endpoint_setup,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                    ..
 
                } => {
 
                    if !setup_incomplete.contains(&index) {
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
 
                    if event.is_writable() && !*sent_local_port {
 
                        let msg =
 
                            MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port };
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: *local_port,
 
                        }));
 
                        endpoint.send(&msg)?;
 
                        log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg);
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        undrained_endpoints.insert(index);
 
                        if let Some(peer_port_info) =
 
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
 
                        {
 
                            log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info);
 
                            assert!(peer_port_info.polarity != endpoint_setup.polarity);
 
                            *recv_peer_port = Some(peer_port_info.port);
 
                            let lci = LogicalChannelInfo {
 
                                local_port: *local_port,
 
                                peer_port: peer_port_info.port,
 
                                local_polarity: endpoint_setup.polarity,
 
                                endpoint_index: index,
 
                            };
 
                            logical_channel_callback(lci);
 
                        let maybe_msg = endpoint.try_recv().map_err(drop)?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                assert!(peer_info.polarity != local_polarity);
 
                                *recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(*local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, *local_port);
 
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
                                log!(
 
                                    logger,
 
                                    "delaying msg {:?} during channel setup phase",
 
                                    inappropriate_msg
 
                                );
 
                                delayed_messages.push((index, inappropriate_msg));
 
                            }
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
@@ -296,9 +287,9 @@ fn new_endpoint_manager(
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        undrained_endpoints,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 
@@ -309,11 +300,7 @@ fn init_neighborhood(
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    ////////////////////////////////////////////
 
    use Msg::SetupMsg as S;
 
    use SetupMsg::*;
 
    ////////////////////////////////////////////
 

	
 
    use {Msg::SetupMsg as S, SetupMsg::*};
 
    log!(logger, "beginning neighborhood construction");
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
@@ -330,7 +317,7 @@ fn init_neighborhood(
 
    let mut my_leader = controller_id;
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
@@ -381,7 +368,10 @@ fn init_neighborhood(
 
                    }
 
                }
 
            }
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg))
 
            }
 
        }
 
    }
 
    match parent {
 
@@ -413,21 +403,27 @@ fn init_neighborhood(
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages);
 
    em.undelay_all();
 
    log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages);
 
    while !awaiting.is_empty() {
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(SetupMsg::LeaderAnnounce { leader }) => {
 
            S(LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg));
 
            }
 
        }
 
    }
 
    children.sort();
0 comments (0 inline, 0 general)