Changeset - 8642f7a7bf01
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-06-22 15:42:02
christopher.esterhuyse@gmail.com
communication phase under reconstruction
7 files changed with 412 insertions and 274 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -31,15 +31,29 @@ pub use Polarity::*;
 
pub type ControllerId = u32;
 
pub type PortSuffix = u32;
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct PortId {
 
pub struct Id {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) port_index: PortSuffix,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 

	
 
#[derive(Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct PortId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ProtoComponentId(Id);
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
@@ -79,6 +93,25 @@ pub enum SyncBlocker {
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl U32Stream {
 
    pub fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl From<Id> for PortId {
 
    fn from(id: Id) -> PortId {
 
        Self(id)
 
    }
 
}
 
impl From<Id> for ProtoComponentId {
 
    fn from(id: Id) -> ProtoComponentId {
 
        Self(id)
 
    }
 
}
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
@@ -137,7 +170,12 @@ impl From<Vec<u8>> for Payload {
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "PortId({},{})", self.controller_id, self.port_index)
 
        write!(f, "PortId({},{})", self.0.controller_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ProtoComponentId({},{})", self.0.controller_id, self.0.u32_suffix)
 
    }
 
}
 
impl std::ops::Not for Polarity {
src/protocol/mod.rs
Show inline comments
 
@@ -247,7 +247,7 @@ impl EvalContext<'_> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_channel();
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
////////////////
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) {
 
        let component = &mut self.connector.proto_components[self.proto_component_index];
 
        assert!(component.ports.is_subset(&moved_ports));
 
        // let polarities = self.proto_description.component_polarities(identifier).expect("BAD ");
 
        // if polarities.len() != ports.len() {
 
        //     return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        // }
 
        // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
        //     if !self.native_ports.contains(port) {
 
        //         return Err(UnknownPort(*port));
 
        //     }
 
        //     if expected_polarity != self.check_polarity(port) {
 
        //         return Err(WrongPortPolarity { port: *port, expected_polarity });
 
        //     }
 
        // }
 
        // // ok!
 
        // let state = self.proto_description.new_main_component(identifier, ports);
 
        // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
 
        // let proto_component_index = self.proto_components.len();
 
        // self.proto_components.push(proto_component);
 
        // for port in ports.iter() {
 
        //     if let Polarity::Getter = self.check_polarity(port) {
 
        //         self.inp_to_route.insert(
 
        //             *port,
 
        //             InpRoute::LocalComponent(LocalComponentId::Proto {
 
        //                 index: proto_component_index,
 
        //             }),
 
        //         );
 
        //     }
 
        // }
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_channel(&mut self) -> [PortId; 2] {
 
        self.connector.add_port_pair()
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        todo!()
 
        self.predicate.query(port)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        todo!()
 
        self.inbox.get(&port)
 
    }
 
}
 

	
 
impl Connector {
 
    pub fn sync(&mut self) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                // 1. run all proto components to Nonsync blockers
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                while let Some((proto_component_id, mut proto_component)) = unrun_components.pop() {
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *self.logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
                        unrun_components: &mut unrun_components,
 
                        proto_component_ports: &mut proto_component.ports,
 
                    };
 
                    match proto_component.state.nonsync_run(&mut ctx, &self.proto_description) {
 
                        NonsyncBlocker::ComponentExit => drop(proto_component),
 
                        NonsyncBlocker::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
                        }
 
                        NonsyncBlocker::SyncBlockStart => {
 
                            self.proto_components.insert(proto_component_id, proto_component);
 
                        }
 
                    }
 
                }
 

	
 
                // all ports are GETTER
 
                let mut mem_inbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // 2. kick off the native
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let mut predicate = Predicate::default();
 
                    // assign trues
 
                    for &port in to_get.iter().chain(to_put.keys()) {
 
                        predicate.assigned.insert(port, true);
 
                    }
 
                    // assign falses
 
                    for &port in self.native_ports.iter() {
 
                        predicate.assigned.entry(port).or_insert(false);
 
                    }
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        let getter = *self.port_info.peers.get(&port).unwrap();
 
                        mem_inbox.push((
 
                            getter,
 
                            SendPayloadMsg { payload_predicate: predicate.clone(), payload },
 
                        ));
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 
                todo!()
 
            }
 
        }
 
    }
 
}
 

	
 
// impl Connector {
 
//     fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> {
 
//         log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
//         log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
//         let ret = match &decision {
 
//             Decision::Success(predicate) => {
 
//                 // overwrite MonoN/P
 
//                 self.inner.mono_n = {
 
//                 self.mono_n = {
 
//                     let poly_n = self.ephemeral.poly_n.take().unwrap();
 
//                     poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
//                         panic!(
 
//                             "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
//                             &predicate, &poly_n.branches, &self.inner.logger
 
//                             &predicate, &poly_n.branches, &self.logger
 
//                         );
 
//                     })
 
//                 };
 
//                 self.inner.mono_ps.clear();
 
//                 self.inner.mono_ps.extend(
 
//                 self.mono_ps.clear();
 
//                 self.mono_ps.extend(
 
//                     self.ephemeral
 
//                         .poly_ps
 
//                         .drain(..)
 
@@ -71,46 +157,46 @@ impl SyncProtoContext<'_> {
 
//             }
 
//             Decision::Failure => Err(SyncError::Timeout),
 
//         };
 
//         let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index);
 
//         for &child_port in self.inner.family.children_ports.iter() {
 
//         let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index);
 
//         for &child_port in self.family.children_ports.iter() {
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 &mut self.logger,
 
//                 "Forwarding {:?} to child with port {:?}",
 
//                 &announcement,
 
//                 child_port
 
//             );
 
//             self.inner
 
//             self
 
//                 .endpoint_exts
 
//                 .get_mut(child_port)
 
//                 .expect("eefef")
 
//                 .endpoint
 
//                 .send(announcement.clone())?;
 
//         }
 
//         self.inner.round_index += 1;
 
//         self.round_index += 1;
 
//         self.ephemeral.clear();
 
//         ret
 
//     }
 

	
 
//     // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
//     fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncError> {
 
//         if let Some(parent_port) = self.inner.family.parent_port {
 
//         if let Some(parent_port) = self.family.parent_port {
 
//             // I have a parent -> I'm not the leader
 
//             let parent_endpoint =
 
//                 &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
//                 &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
//             for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
//                 let msg =
 
//                     CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
//                 log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
//                     CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index);
 
//                 log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
//                 parent_endpoint.send(msg)?;
 
//             }
 
//             Ok(false)
 
//         } else {
 
//             // I have no parent -> I'm the leader
 
//             assert!(self.inner.family.parent_port.is_none());
 
//             assert!(self.family.parent_port.is_none());
 
//             let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
//             Ok(if let Some(predicate) = maybe_predicate {
 
//                 let decision = Decision::Success(predicate);
 
//                 log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
//                 log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
//                 self.end_round_with_decision(decision)?;
 
//                 true
 
//             } else {
 
@@ -123,7 +209,7 @@ impl SyncProtoContext<'_> {
 
//         &mut self,
 
//         sync_batches: impl Iterator<Item = SyncBatch>,
 
//     ) -> Result<PolyN, EndpointError> {
 
//         let MonoN { ports, .. } = self.inner.mono_n.clone();
 
//         let MonoN { ports, .. } = self.mono_n.clone();
 
//         let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
//         let mut branches = HashMap::<_, _>::default();
 
//         for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
@@ -152,7 +238,7 @@ impl SyncProtoContext<'_> {
 
//             let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
//             for (port, payload) in puts {
 
//                 log!(
 
//                     &mut self.inner.logger,
 
//                     &mut self.logger,
 
//                     "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
//                     &payload,
 
//                     &predicate,
 
@@ -164,14 +250,14 @@ impl SyncProtoContext<'_> {
 
//                 endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
//             }
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 &mut self.logger,
 
//                 "... Initial native branch batch index={} with pred {:?}",
 
//                 sync_batch_index,
 
//                 &predicate
 
//             );
 
//             if branch.to_get.is_empty() {
 
//                 self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                     &mut self.inner.logger,
 
//                     &mut self.logger,
 
//                     SubtreeId::PolyN,
 
//                     predicate.clone(),
 
//                 );
 
@@ -194,7 +280,7 @@ impl SyncProtoContext<'_> {
 
//                 // Must set unrecoverable error! and tear down our net channels
 
//                 self.unrecoverable_error = Some(e);
 
//                 self.ephemeral.clear();
 
//                 self.inner.endpoint_exts = Default::default();
 
//                 self.endpoint_exts = Default::default();
 
//                 e
 
//             }
 
//         })
 
@@ -208,9 +294,9 @@ impl SyncProtoContext<'_> {
 
//         sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
//     ) -> Result<(), SyncError> {
 
//         log!(
 
//             &mut self.inner.logger,
 
//             &mut self.logger,
 
//             "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
//             self.inner.round_index
 
//             self.round_index
 
//         );
 
//         assert!(self.ephemeral.is_clear());
 
//         assert!(self.unrecoverable_error.is_none());
 
@@ -218,17 +304,17 @@ impl SyncProtoContext<'_> {
 
//         // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
//         //    Some actors are dropped. some new actors are created.
 
//         //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
//         self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
//         log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
//         self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned());
 
//         log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
//         while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
//             let mut m_ctx = ProtoSyncContext {
 
//                 ports: &mut mono_p.ports,
 
//                 mono_ps: &mut self.ephemeral.mono_ps,
 
//                 inner: &mut self.inner,
 
//                 inner: &mut self,
 
//             };
 
//             // cross boundary into crate::protocol
 
//             let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
//             log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
//             log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
//             match blocker {
 
//                 NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent),
 
//                 NonsyncBlocker::ComponentExit => drop(mono_p),
 
@@ -236,7 +322,7 @@ impl SyncProtoContext<'_> {
 
//             }
 
//         }
 
//         log!(
 
//             &mut self.inner.logger,
 
//             &mut self.logger,
 
//             "Finished running all MonoPs! Have {} PolyPs waiting",
 
//             self.ephemeral.poly_ps.len()
 
//         );
 
@@ -247,7 +333,7 @@ impl SyncProtoContext<'_> {
 
//         //    TODO: store and update this mapping rather than rebuilding it each round.
 
//         let port_to_holder: HashMap<PortId, PolyId> = {
 
//             use PolyId::*;
 
//             let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
//             let n = self.mono_n.ports.iter().map(move |&e| (e, N));
 
//             let p = self
 
//                 .ephemeral
 
//                 .poly_ps
 
@@ -257,7 +343,7 @@ impl SyncProtoContext<'_> {
 
//             n.chain(p).collect()
 
//         };
 
//         log!(
 
//             &mut self.inner.logger,
 
//             &mut self.logger,
 
//             "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
//             &port_to_holder
 
//         );
 
@@ -268,14 +354,14 @@ impl SyncProtoContext<'_> {
 
//             let n = std::iter::once(SubtreeId::PolyN);
 
//             let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
//             let c = self
 
//                 .inner
 
//
 
//                 .family
 
//                 .children_ports
 
//                 .iter()
 
//                 .map(|&port| SubtreeId::ChildController { port });
 
//             let subtree_id_iter = n.chain(m).chain(c);
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 &mut self.logger,
 
//                 "Solution Storage has subtree Ids: {:?}",
 
//                 &subtree_id_iter.clone().collect::<Vec<_>>()
 
//             );
 
@@ -284,49 +370,49 @@ impl SyncProtoContext<'_> {
 

	
 
//         // 5. kick off the synchronous round of the native actor if it exists
 

	
 
//         log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
//         log!(&mut self.logger, "Kicking off native's synchronous round...");
 
//         self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
//             // using if let because of nested ? operator
 
//             // TODO check that there are 1+ branches or NO SOLUTION
 
//             let poly_n = self.kick_off_native(sync_batches)?;
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 &mut self.logger,
 
//                 "PolyN kicked off, and has branches with predicates... {:?}",
 
//                 poly_n.branches.keys().collect::<Vec<_>>()
 
//             );
 
//             Some(poly_n)
 
//         } else {
 
//             log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
//             log!(&mut self.logger, "NO NATIVE COMPONENT");
 
//             None
 
//         };
 

	
 
//         // 6. Kick off the synchronous round of each protocol actor
 
//         //    If just one actor becomes inconsistent now, there can be no solution!
 
//         //    TODO distinguish between completed and not completed poly_p's?
 
//         log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
//         log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
//         for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
//             let my_subtree_id = SubtreeId::PolyP { index };
 
//             let m_ctx = PolyPContext {
 
//                 my_subtree_id,
 
//                 inner: &mut self.inner,
 
//                 inner: &mut self,
 
//                 solution_storage: &mut self.ephemeral.solution_storage,
 
//             };
 
//             use SyncRunResult as Srr;
 
//             let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
//             log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
//             log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
//             match blocker {
 
//                 Srr::NoBranches => return Err(SyncError::Inconsistent),
 
//                 Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
//             }
 
//         }
 
//         log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 
//         log!(&mut self.logger, "All Poly machines have been kicked off!");
 

	
 
//         // 7. `solution_storage` may have new solutions for this controller
 
//         //    handle their discovery. LEADER => announce, otherwise => send to parent
 
//         {
 
//             let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
//             log!(
 
//                 &mut self.inner.logger,
 
//                 &mut self.logger,
 
//                 "Got {} controller-local solutions before a single RECV: {:?}",
 
//                 peeked.len(),
 
//                 peeked
 
@@ -337,10 +423,10 @@ impl SyncProtoContext<'_> {
 
//         }
 

	
 
//         // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
//         log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
//         log!(&mut self.logger, "`No decision yet`. Time to recv messages");
 
//         self.undelay_all();
 
//         'recv_loop: loop {
 
//             log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
//             log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline);
 
//             let received = match deadline {
 
//                 None => {
 
//                     // we have personally timed out. perform a "long" poll.
 
@@ -353,7 +439,7 @@ impl SyncProtoContext<'_> {
 
//                         // timed out! send a FAILURE message to the sink,
 
//                         // and henceforth don't time out on polling.
 
//                         deadline = None;
 
//                         match self.inner.family.parent_port {
 
//                         match self.family.parent_port {
 
//                             None => {
 
//                                 // I am the sink! announce failure and return.
 
//                                 return self.end_round_with_decision(Decision::Failure);
 
@@ -361,16 +447,16 @@ impl SyncProtoContext<'_> {
 
//                             Some(parent_port) => {
 
//                                 // I am not the sink! send a failure message.
 
//                                 let announcement = Msg::CommMsg(CommMsg {
 
//                                     round_index: self.inner.round_index,
 
//                                     round_index: self.round_index,
 
//                                     contents: CommMsgContents::Failure,
 
//                                 });
 
//                                 log!(
 
//                                     &mut self.inner.logger,
 
//                                     &mut self.logger,
 
//                                     "Forwarding {:?} to parent with port {:?}",
 
//                                     &announcement,
 
//                                     parent_port
 
//                                 );
 
//                                 self.inner
 
//                                 self
 
//                                     .endpoint_exts
 
//                                     .get_mut(parent_port)
 
//                                     .expect("ss")
 
@@ -382,7 +468,7 @@ impl SyncProtoContext<'_> {
 
//                     }
 
//                 },
 
//             };
 
//             log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
//             log!(&mut self.logger, "::: message {:?}...", &received);
 
//             let current_content = match received.msg {
 
//                 Msg::SetupMsg(s) => {
 
//                     // This occurs in the event the connector was malformed during connect()
 
@@ -390,45 +476,45 @@ impl SyncProtoContext<'_> {
 
//                     return Err(SyncError::UnexpectedSetupMsg);
 
//                 }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index < self.inner.round_index =>
 
//                     if round_index < self.round_index =>
 
//                 {
 
//                     // Old message! Can safely discard
 
//                     log!(&mut self.inner.logger, "...and its OLD! :(");
 
//                     log!(&mut self.logger, "...and its OLD! :(");
 
//                     drop(received);
 
//                     continue 'recv_loop;
 
//                 }
 
//                 Msg::CommMsg(CommMsg { round_index, .. })
 
//                     if round_index > self.inner.round_index =>
 
//                     if round_index > self.round_index =>
 
//                 {
 
//                     // Message from a next round. Keep for later!
 
//                     log!(&mut self.inner.logger, "... DELAY! :(");
 
//                     log!(&mut self.logger, "... DELAY! :(");
 
//                     self.delay(received);
 
//                     continue 'recv_loop;
 
//                 }
 
//                 Msg::CommMsg(CommMsg { contents, round_index }) => {
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         &mut self.logger,
 
//                         "... its a round-appropriate CommMsg with port {:?}",
 
//                         received.recipient
 
//                     );
 
//                     assert_eq!(round_index, self.inner.round_index);
 
//                     assert_eq!(round_index, self.round_index);
 
//                     contents
 
//                 }
 
//             };
 
//             match current_content {
 
//                 CommMsgContents::Failure => match self.inner.family.parent_port {
 
//                 CommMsgContents::Failure => match self.family.parent_port {
 
//                     Some(parent_port) => {
 
//                         let announcement = Msg::CommMsg(CommMsg {
 
//                             round_index: self.inner.round_index,
 
//                             round_index: self.round_index,
 
//                             contents: CommMsgContents::Failure,
 
//                         });
 
//                         log!(
 
//                             &mut self.inner.logger,
 
//                             &mut self.logger,
 
//                             "Forwarding {:?} to parent with port {:?}",
 
//                             &announcement,
 
//                             parent_port
 
//                         );
 
//                         self.inner
 
//                         self
 
//                             .endpoint_exts
 
//                             .get_mut(parent_port)
 
//                             .expect("ss")
 
@@ -439,18 +525,18 @@ impl SyncProtoContext<'_> {
 
//                 },
 
//                 CommMsgContents::Elaborate { partial_oracle } => {
 
//                     // Child controller submitted a subtree solution.
 
//                     if !self.inner.family.children_ports.contains(&received.recipient) {
 
//                     if !self.family.children_ports.contains(&received.recipient) {
 
//                         return Err(SyncError::ElaborateFromNonChild);
 
//                     }
 
//                     let subtree_id = SubtreeId::ChildController { port: received.recipient };
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         &mut self.logger,
 
//                         "Received elaboration from child for subtree {:?}: {:?}",
 
//                         subtree_id,
 
//                         &partial_oracle
 
//                     );
 
//                     self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                         &mut self.inner.logger,
 
//                         &mut self.logger,
 
//                         subtree_id,
 
//                         partial_oracle,
 
//                     );
 
@@ -459,11 +545,11 @@ impl SyncProtoContext<'_> {
 
//                     }
 
//                 }
 
//                 CommMsgContents::Announce { decision } => {
 
//                     if self.inner.family.parent_port != Some(received.recipient) {
 
//                     if self.family.parent_port != Some(received.recipient) {
 
//                         return Err(SyncError::AnnounceFromNonParent);
 
//                     }
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         &mut self.logger,
 
//                         "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
//                         received.recipient,
 
//                         &decision
 
@@ -474,21 +560,21 @@ impl SyncProtoContext<'_> {
 
//                     // check that we expect to be able to receive payloads from this sender
 
//                     assert_eq!(
 
//                         Getter,
 
//                         self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
//                         self.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
//                     );
 

	
 
//                     // message for some actor. Feed it to the appropriate actor
 
//                     // and then give them another chance to run.
 
//                     let subtree_id = port_to_holder.get(&received.recipient);
 
//                     log!(
 
//                         &mut self.inner.logger,
 
//                         &mut self.logger,
 
//                         "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
//                         subtree_id,
 
//                         &payload_predicate,
 
//                         &payload
 
//                     );
 
//                     let channel_id = self
 
//                         .inner
 
//
 
//                         .endpoint_exts
 
//                         .get(received.recipient)
 
//                         .expect("UEHFU")
 
@@ -508,7 +594,7 @@ impl SyncProtoContext<'_> {
 
//                             // Message for NativeMachine
 
//                             self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
//                                 received.recipient,
 
//                                 &mut self.inner.logger,
 
//                                 &mut self.logger,
 
//                                 payload,
 
//                                 payload_predicate,
 
//                                 &mut self.ephemeral.solution_storage,
 
@@ -523,7 +609,7 @@ impl SyncProtoContext<'_> {
 

	
 
//                             let m_ctx = PolyPContext {
 
//                                 my_subtree_id: SubtreeId::PolyP { index: *index },
 
//                                 inner: &mut self.inner,
 
//                                 inner: &mut self,
 
//                                 solution_storage: &mut self.ephemeral.solution_storage,
 
//                             };
 
//                             use SyncRunResult as Srr;
 
@@ -535,7 +621,7 @@ impl SyncProtoContext<'_> {
 
//                                 payload,
 
//                             )?;
 
//                             log!(
 
//                                 &mut self.inner.logger,
 
//                                 &mut self.logger,
 
//                                 "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
//                                 subtree_id,
 
//                                 blocker
 
@@ -550,7 +636,7 @@ impl SyncProtoContext<'_> {
 
//                                             .peek_new_locals()
 
//                                             .collect::<Vec<_>>();
 
//                                         log!(
 
//                                             &mut self.inner.logger,
 
//                                             &mut self.logger,
 
//                                             "Got {} new controller-local solutions from RECV: {:?}",
 
//                                             peeked.len(),
 
//                                             peeked
 
@@ -712,10 +798,10 @@ impl SyncProtoContext<'_> {
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.inner.logger,
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
@@ -726,7 +812,7 @@ impl SyncProtoContext<'_> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.inner.logger,
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug)]
 
pub enum SyncError {
 
    Timeout,
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -11,7 +11,7 @@ use error::*;
 
#[derive(Clone, Copy, Debug)]
 
pub enum LocalComponentId {
 
    Native,
 
    Proto { index: usize },
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy)]
 
pub enum Route {
 
@@ -47,11 +47,16 @@ pub struct CommMsg {
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload { payload_predicate: Predicate, payload: Payload },
 
    SendPayload(SendPayloadMsg),
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    payload_predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
@@ -64,16 +69,7 @@ pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Default)]
 
pub struct IntStream {
 
    next: u32,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: IntStream,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
@@ -103,6 +99,12 @@ pub struct MemInMsg {
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
@@ -122,11 +124,11 @@ pub struct PortInfo {
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    logger: Box<dyn Logger>,
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    proto_components: Vec<ProtoComponent>,
 
    port_info: PortInfo,
 
    phased: ConnectorPhased,
 
}
 
@@ -137,29 +139,42 @@ pub enum ConnectorPhased {
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        round_index: usize,
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
        native_actor: NativeActor, // sync invariant: in Nonsync state
 
        native_batches: Vec<NativeBatch>,
 
        round_result: Result<Option<usize>, SyncError>,
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
pub struct SyncProtoContext<'a> {
 
    connector: &'a mut Connector,
 
    proto_component_index: usize,
 
}
 
pub struct NonsyncProtoContext<'a> {
 
    connector: &'a mut Connector,
 
    proto_component_index: usize,
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    predicate: &'a Predicate,
 
    proto_component_id: ProtoComponentId,
 
    inbox: HashMap<PortId, Payload>,
 
}
 

	
 
// pub struct MonoPContext<'a> {
 
@@ -186,72 +201,96 @@ pub struct NonsyncProtoContext<'a> {
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
#[derive(Default)]
 
pub struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
pub enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 
// #[derive(Default)]
 
// pub struct SolutionStorage {
 
//     old_local: HashSet<Predicate>,
 
//     new_local: HashSet<Predicate>,
 
//     // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
//     subtree_solutions: Vec<HashSet<Predicate>>,
 
//     subtree_id_to_index: HashMap<SubtreeId, usize>,
 
// }
 
// #[derive(Debug)]
 
// pub enum SyncRunResult {
 
//     BlockingForRecv,
 
//     AllBranchesComplete,
 
//     NoBranches,
 
// }
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub enum PolyId {
 
//     N,
 
//     P { index: usize },
 
// }
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { port: PortId },
 
}
 
#[derive(Debug, Default)]
 
pub struct SyncBatch {
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub enum SubtreeId {
 
//     PolyN,
 
//     PolyP { index: usize },
 
//     ChildController { port: PortId },
 
// }
 
// #[derive(Debug)]
 
// pub struct NativeBranch {
 
//     gotten: HashMap<PortId, Payload>,
 
//     to_get: HashSet<PortId>,
 
// }
 

	
 
////////////////
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> PortId {
 
        match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        }
 
    }
 
}
 
#[derive(Debug)]
 
pub enum NativeActor {
 
    Nonsync {
 
        sync_result_branch: Option<NativeBranch>, // invariant: sync_result_branch.to_get.is_empty()
 
        next_batches: Vec<SyncBatch>,             // invariant: nonempty
 
    },
 
    Sync {
 
        branches: HashMap<Predicate, NativeBranch>,
 
    },
 
impl IdManager {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self {
 
            controller_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            controller_id: self.controller_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
#[derive(Debug)]
 
pub struct NativeBranch {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
impl Connector {
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
}
 

	
 
////////////////
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 

	
 
        loop {
 
            // 2. try read a message from an enpoint that previously raised an event
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) =
 
@@ -264,14 +303,14 @@ impl EndpointManager {
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. poll!
 
            // 3. No message yet. Do we have enough time to poll?
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                log!(logger, "Poll event {:?}", event);
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
            }
 
            self.events.clear();
 
        }
 
    }
 
    fn undelay_all(&mut self) {
 
@@ -331,25 +370,6 @@ impl std::fmt::Write for StringLogger {
 
        self.1.write_str(s)
 
    }
 
}
 
impl IntStream {
 
    fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl IdManager {
 
    fn next_port(&mut self) -> PortId {
 
        let port_suffix = self.port_suffix_stream.next();
 
        let controller_id = self.controller_id;
 
        PortId { controller_id, port_index: port_suffix }
 
    }
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, port_suffix_stream: Default::default() }
 
    }
 
}
 
impl Endpoint {
 
    fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
@@ -397,19 +417,19 @@ impl Connector {
 
        )
 
        .unwrap();
 
        self.get_logger().dump_log(&mut lock);
 
        writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
        for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
            let sols = &self.subtree_solutions[index];
 
            f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
        }
 
        f.pad("]")
 
        writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 
// impl Debug for SolutionStorage {
 
//     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
//         f.pad("Solutions: [")?;
 
//         for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
//             let sols = &self.subtree_solutions[index];
 
//             f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
//         }
 
//         f.pad("]")
 
//     }
 
// }
 

	
 
impl Predicate {
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
@@ -526,7 +546,4 @@ impl Predicate {
 
    pub fn query(&self, x: PortId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    }
 
    pub fn new_trivial() -> Self {
 
        Self { assigned: Default::default() }
 
    }
 
}
src/runtime/my_tests.rs
Show inline comments
 
@@ -26,27 +26,27 @@ fn simple_connector() {
 
}
 

	
 
#[test]
 
fn add_port_pair() {
 
fn new_port_pair() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, _] = c.add_port_pair();
 
    let [_, _] = c.add_port_pair();
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_sync() {
 
fn new_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.add_port_pair();
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_net_port() {
 
fn new_net_port() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
@@ -61,8 +61,8 @@ fn trivial_connect() {
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    let res = c.connect(Duration::from_secs(1));
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
@@ -75,13 +75,13 @@ fn multithreaded_connect() {
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
src/runtime/setup2.rs
Show inline comments
 
@@ -24,38 +24,23 @@ impl Connector {
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        Self {
 
            logger,
 
            proto_description,
 
            proto_components: Default::default(),
 
            logger,
 
            id_manager: IdManager::new(controller_id),
 
            native_ports: Default::default(),
 
            proto_components: Default::default(),
 
            port_info: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_net_port(
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        endpoint_setup: EndpointSetup,
 
    ) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.next_port();
 
                let p = self.id_manager.new_port_id();
 
                self.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                self.port_info.polarities.insert(p, polarity);
 
@@ -72,7 +57,9 @@ impl Connector {
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
@@ -85,19 +72,21 @@ impl Connector {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // ok!
 
        let state = self.proto_description.new_main_component(identifier, ports);
 
        let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
 
        let proto_component_index = self.proto_components.len();
 
        self.proto_components.push(proto_component);
 
        // 3. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() {
 
                self.port_info.routes.insert(
 
                    *port,
 
                    Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }),
 
                );
 
            }
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        // 4. add new component
 
        self.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: self.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
@@ -110,10 +99,12 @@ impl Connector {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = {
 
                    let Self { logger, port_info, .. } = self;
 
                    new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)?
 
                };
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    self.logger,
 
                    "Successfully connected {} endpoints",
 
@@ -129,13 +120,12 @@ impl Connector {
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_actor: NativeActor::Nonsync {
 
                        sync_result_branch: None,
 
                        next_batches: vec![SyncBatch::default()],
 
                    },
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                Ok(())
 
            }
 
@@ -317,7 +307,7 @@ fn init_neighborhood(
 
    let mut my_leader = controller_id;
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
@@ -331,7 +321,7 @@ fn init_neighborhood(
 
            S(LeaderEcho { maybe_leader }) => {
 
                use Ordering::*;
 
                match maybe_leader.cmp(&my_leader) {
 
                    Less => { /* ignore */ }
 
                    Less => { /* ignore this wave */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
@@ -339,7 +329,7 @@ fn init_neighborhood(
 
                                // return the echo to my parent
 
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // DECIDE!
 
                                // wave completed!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
@@ -358,6 +348,7 @@ fn init_neighborhood(
 
                        } else {
 
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    // don't propagate echo to my parent
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
@@ -408,7 +399,7 @@ fn init_neighborhood(
 
    log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages);
 
    while !awaiting.is_empty() {
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
0 comments (0 inline, 0 general)