Changeset - a3ca8e491622
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-06-23 16:45:27
christopher.esterhuyse@gmail.com
introduced cyclic drain. working on proto comm phase
4 files changed with 324 insertions and 737 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -16,21 +16,30 @@ struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Clone)]
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 

	
 
////////////////
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
@@ -294,116 +303,35 @@ impl Connector {
 
                // run all proto components to their sync blocker
 
                log!(
 
                    logger,
 
                    "Running all {} proto components to their sync blocker...",
 
                    branching_proto_components.len()
 
                );
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    // run this component to sync blocker in-place
 
                    log!(
 
                for (&proto_component_id, proto_component) in branching_proto_components.iter_mut()
 
                {
 
                    let Self { port_info, proto_description, .. } = self;
 
                    let BranchingProtoComponent { ports, branches } = proto_component;
 
                    let mut swap = HashMap::default();
 
                    let mut blocked = HashMap::default();
 
                    // drain from branches --> blocked
 
                    let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
                    BranchingProtoComponent::drain_branches_to_blocked(
 
                        cd,
 
                        logger,
 
                        "Running proto component with id {:?} to blocker...",
 
                        proto_component_id
 
                        port_info,
 
                        proto_description,
 
                        &mut solution_storage,
 
                        |putter, m| {
 
                            let getter = *port_info.peers.get(&putter).unwrap();
 
                            payloads_to_get.push((getter, m));
 
                        },
 
                        proto_component_id,
 
                        ports,
 
                    );
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
                        &mut Default::default(),
 
                    ];
 
                    // DRAIN-AND-POPULATE PATTERN: DRAINING unblocked into blocked while POPULATING unblocked
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger,
 
                                predicate: &predicate,
 
                                port_info: &self.port_info,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            let blocker = branch.state.sync_run(&mut ctx, &self.proto_description);
 
                            log!(
 
                                logger,
 
                                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                                proto_component_id,
 
                                &predicate,
 
                                &blocker,
 
                            );
 
                            use SyncBlocker as B;
 
                            match blocker {
 
                                B::Inconsistent => {
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        let var = self.port_info.firing_var_for(port);
 
                                        predicate.assigned.entry(var).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
                                        predicate.clone(),
 
                                    );
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(putter, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(
 
                                        Some(&Putter),
 
                                        self.port_info.polarities.get(&putter)
 
                                    );
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(putter);
 

	
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                                        payloads_to_get.push((
 
                                            getter,
 
                                            SendPayloadMsg {
 
                                                predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                    // swap the blocked branches back
 
                    std::mem::swap(&mut blocked, branches);
 
                }
 
                log!(logger, "All proto components are blocked");
 

	
 
                log!(logger, "Entering decision loop...");
 
                endpoint_manager.undelay_all();
 
                let decision = 'undecided: loop {
 
@@ -425,18 +353,26 @@ impl Connector {
 
                                    &mut solution_storage,
 
                                    getter,
 
                                    send_payload_msg,
 
                                ),
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                                if let Some(branching_component) =
 
                                    branching_proto_components.get_mut(&proto_component_id)
 
                                    branching_proto_components.get_mut(proto_component_id)
 
                                {
 
                                    let proto_component_id = *proto_component_id;
 
                                    let Self { port_info, proto_description, .. } = self;
 
                                    branching_component.feed_msg(
 
                                        logger,
 
                                        &self.port_info,
 
                                        port_info,
 
                                        proto_description,
 
                                        &mut solution_storage,
 
                                        proto_component_id,
 
                                        |putter, m| {
 
                                            let getter = *port_info.peers.get(&putter).unwrap();
 
                                            payloads_to_get.push((getter, m));
 
                                        },
 
                                        getter,
 
                                        send_payload_msg,
 
                                    )
 
                                }
 
                            }
 
                        }
 
@@ -621,12 +557,13 @@ impl BranchingNative {
 
        log!(logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
@@ -702,22 +639,191 @@ impl BranchingNative {
 
                return (index, gotten);
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 

	
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainInner { swap, output } }
 
    }
 
    fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) {
 
        let Self { input, inner: CyclicDrainInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                func(k, v, CyclicDrainInner { swap, output })
 
            }
 
        }
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 

	
 
impl ProtoComponentBranch {
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let was = self.inbox.insert(getter, payload);
 
        assert!(was.is_none())
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        //
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        proto_description: &ProtocolDescription,
 
        solution_storage: &mut SolutionStorage,
 
        mut outbox_unqueue: impl FnMut(PortId, SendPayloadMsg),
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                    logger,
 
                    predicate: &predicate,
 
                    port_info,
 
                    proto_component_id,
 
                    inbox: &branch.inbox,
 
                };
 
                let blocker = branch.state.sync_run(&mut ctx, proto_description);
 
                log!(
 
                    logger,
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => {
 
                        // branch is inconsistent. throw it away
 
                        drop((predicate, branch));
 
                    }
 
                    B::SyncBlockEnd => {
 
                        // make concrete all variables
 
                        for &port in ports.iter() {
 
                            let var = port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        // submit solution for this component
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)),
 
                            predicate.clone(),
 
                        );
 
                        // move to "blocked"
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntReadMsg(port) => {
 
                        // move to "blocked"
 
                        assert!(!branch.inbox.contains_key(&port));
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check
 
                        let var = port_info.firing_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // keep forks in "unblocked"
 
                        drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                        drainer.add_input(predicate.inserted(var, true), branch);
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check
 
                        assert_eq!(Some(&Putter), port_info.polarities.get(&putter));
 
                        // overwrite assignment
 
                        let var = port_info.firing_var_for(putter);
 

	
 
                        let was = predicate.assigned.insert(var, true);
 
                        if was == Some(false) {
 
                            log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                            // discard forever
 
                            drop((predicate, branch));
 
                        } else {
 
                            // keep in "unblocked"
 
                            log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                            outbox_unqueue(
 
                                putter,
 
                                SendPayloadMsg { predicate: predicate.clone(), payload },
 
                            );
 
                            drainer.add_input(predicate, branch);
 
                        }
 
                    }
 
                }
 
        });
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        _port_info: &PortInfo,
 
        _solution_storage: &mut SolutionStorage,
 
        _getter: PortId,
 
        _send_payload_msg: SendPayloadMsg,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        proto_description: &ProtocolDescription,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        outbox_unqueue: impl FnMut(PortId, SendPayloadMsg),
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        todo!()
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        for (predicate, mut branch) in branches.drain() {
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    blocked.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            logger,
 
            port_info,
 
            proto_description,
 
            solution_storage,
 
            outbox_unqueue,
 
            proto_component_id,
 
            ports,
 
        );
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
@@ -833,586 +939,6 @@ impl SolutionStorage {
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 

	
 
// impl ControllerEphemeral {
 
//     fn is_clear(&self) -> bool {
 
//         self.solution_storage.is_clear()
 
//             && self.poly_n.is_none()
 
//             && self.poly_ps.is_empty()
 
//             && self.mono_ps.is_empty()
 
//             && self.port_to_holder.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.solution_storage.clear();
 
//         self.poly_n.take();
 
//         self.poly_ps.clear();
 
//         self.port_to_holder.clear();
 
//     }
 
// }
 
// impl Into<PolyP> for MonoP {
 
//     fn into(self) -> PolyP {
 
//         PolyP {
 
//             complete: Default::default(),
 
//             incomplete: hashmap! {
 
//                 Predicate::new_trivial() =>
 
//                 BranchP {
 
//                     state: self.state,
 
//                     inbox: Default::default(),
 
//                     outbox: Default::default(),
 
//                     blocking_on: None,
 
//                 }
 
//             },
 
//             ports: self.ports,
 
//         }
 
//     }
 
// }
 

	
 
// impl From<EndpointError> for SyncError {
 
//     fn from(e: EndpointError) -> SyncError {
 
//         SyncError::EndpointError(e)
 
//     }
 
// }
 

	
 
// impl ProtoSyncContext<'_> {
 
//     fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
//         todo!()
 
//     }
 
//     fn new_channel(&mut self) -> [PortId; 2] {
 
//         todo!()
 
//     }
 
// }
 

	
 
// impl PolyContext for BranchPContext<'_, '_> {
 
//     type D = ProtocolD;
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
//     fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
// }
 

	
 
//////////////
 

	
 
// impl Connector {
 
// fn end_round_with_decision(&mut self, decision: Decision) -> Result<usize, SyncError> {
 
//     log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
//     let ret = match &decision {
 
//         Decision::Success(predicate) => {
 
//             // overwrite MonoN/P
 
//             self.mono_n = {
 
//                 let poly_n = self.ephemeral.poly_n.take().unwrap();
 
//                 poly_n.choose_mono(predicate).unwrap_or_else(|| {
 
//                     panic!(
 
//                         "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}",
 
//                         &predicate, &poly_n.branches, &self.logger
 
//                     );
 
//                 })
 
//             };
 
//             self.mono_ps.clear();
 
//             self.mono_ps.extend(
 
//                 self.ephemeral
 
//                     .poly_ps
 
//                     .drain(..)
 
//                     .map(|poly_p| poly_p.choose_mono(predicate).unwrap()),
 
//             );
 
//             Ok(())
 
//         }
 
//         Decision::Failure => Err(SyncError::Timeout),
 
//     };
 
//     let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index);
 
//     for &child_port in self.family.children_ports.iter() {
 
//         log!(
 
//             &mut self.logger,
 
//             "Forwarding {:?} to child with port {:?}",
 
//             &announcement,
 
//             child_port
 
//         );
 
//         self.endpoint_exts
 
//             .get_mut(child_port)
 
//             .expect("eefef")
 
//             .endpoint
 
//             .send(announcement.clone())?;
 
//     }
 
//     self.round_index += 1;
 
//     self.ephemeral.clear();
 
//     ret
 
// }
 

	
 
// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
// fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncError> {
 
//     if let Some(parent_port) = self.family.parent_port {
 
//         // I have a parent -> I'm not the leader
 
//         let parent_endpoint =
 
//             &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint;
 
//         for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
//             let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index);
 
//             log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port);
 
//             parent_endpoint.send(msg)?;
 
//         }
 
//         Ok(false)
 
//     } else {
 
//         // I have no parent -> I'm the leader
 
//         assert!(self.family.parent_port.is_none());
 
//         let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
//         Ok(if let Some(predicate) = maybe_predicate {
 
//             let decision = Decision::Success(predicate);
 
//             log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
//             self.end_round_with_decision(decision)?;
 
//             true
 
//         } else {
 
//             false
 
//         })
 
//     }
 
// }
 

	
 
// fn kick_off_native(
 
//     &mut self,
 
//     sync_batches: impl Iterator<Item = SyncBatch>,
 
// ) -> Result<PolyN, EndpointError> {
 
//     let MonoN { ports, .. } = self.mono_n.clone();
 
//     let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
//     let mut branches = HashMap::<_, _>::default();
 
//     for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
//         let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let all_ports = ports.iter().copied();
 
//         let all_channel_ids = all_ports.map(port_to_channel_id);
 

	
 
//         let mut predicate = Predicate::new_trivial();
 

	
 
//         // assign TRUE for puts and gets
 
//         let true_ports = puts.keys().chain(gets.iter()).copied();
 
//         let true_channel_ids = true_ports.clone().map(port_to_channel_id);
 
//         predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
//         // assign FALSE for all in interface not assigned true
 
//         predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
//         if branches.contains_key(&predicate) {
 
//             // TODO what do I do with redundant predicates?
 
//             unimplemented!(
 
//                 "Duplicate predicate {:#?}!\nHaving multiple batches with the same
 
//                 predicate requires the support of oracle boolean variables",
 
//                 &predicate,
 
//             )
 
//         }
 
//         let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
//         for (port, payload) in puts {
 
//             log!(
 
//                 &mut self.logger,
 
//                 "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
//                 &payload,
 
//                 &predicate,
 
//                 sync_batch_index,
 
//             );
 
//             let msg =
 
//                 CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
//                     .into_msg(*round_index);
 
//             endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?;
 
//         }
 
//         log!(
 
//             &mut self.logger,
 
//             "... Initial native branch batch index={} with pred {:?}",
 
//             sync_batch_index,
 
//             &predicate
 
//         );
 
//         if branch.to_get.is_empty() {
 
//             self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                 &mut self.logger,
 
//                 Route::PolyN,
 
//                 predicate.clone(),
 
//             );
 
//         }
 
//         branches.insert(predicate, branch);
 
//     }
 
//     Ok(PolyN { ports, branches })
 
// }
 
// pub fn sync_round(
 
//     &mut self,
 
//     deadline: Option<Instant>,
 
//     sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
// ) -> Result<(), SyncError> {
 
//     if let Some(e) = self.unrecoverable_error {
 
//         return Err(e.clone());
 
//     }
 
//     self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
//         SyncError::Timeout => e, // this isn't unrecoverable
 
//         _ => {
 
//             // Must set unrecoverable error! and tear down our net channels
 
//             self.unrecoverable_error = Some(e);
 
//             self.ephemeral.clear();
 
//             self.endpoint_exts = Default::default();
 
//             e
 
//         }
 
//     })
 
// }
 

	
 
// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
// fn sync_round_inner(
 
//     &mut self,
 
//     mut deadline: Option<Instant>,
 
//     sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
// ) -> Result<(), SyncError> {
 
//     log!(&mut self.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.round_index);
 
//     assert!(self.ephemeral.is_clear());
 
//     assert!(self.unrecoverable_error.is_none());
 

	
 
//     // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
//     //    Some actors are dropped. some new actors are created.
 
//     //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
//     self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned());
 
//     log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
//     while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
//         let mut m_ctx = ProtoSyncContext {
 
//             ports: &mut mono_p.ports,
 
//             mono_ps: &mut self.ephemeral.mono_ps,
 
//             inner: &mut self,
 
//         };
 
//         // cross boundary into crate::protocol
 
//         let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
//         log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
//         match blocker {
 
//             NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent),
 
//             NonsyncBlocker::ComponentExit => drop(mono_p),
 
//             NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
//         }
 
//     }
 
//     log!(
 
//         &mut self.logger,
 
//         "Finished running all MonoPs! Have {} PolyPs waiting",
 
//         self.ephemeral.poly_ps.len()
 
//     );
 

	
 
//     // 3. define the mapping from port -> actor
 
//     //    this is needed during the event loop to determine which actor
 
//     //    should receive the incoming message.
 
//     //    TODO: store and update this mapping rather than rebuilding it each round.
 
//     let port_to_holder: HashMap<PortId, PolyId> = {
 
//         use PolyId::*;
 
//         let n = self.mono_n.ports.iter().map(move |&e| (e, N));
 
//         let p = self
 
//             .ephemeral
 
//             .poly_ps
 
//             .iter()
 
//             .enumerate()
 
//             .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index })));
 
//         n.chain(p).collect()
 
//     };
 
//     log!(
 
//         &mut self.logger,
 
//         "SET OF PolyPs and MonoPs final! port lookup map is {:?}",
 
//         &port_to_holder
 
//     );
 

	
 
//     // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
//     //    of the controller in the overlay tree.
 
//     self.ephemeral.solution_storage.reset({
 
//         let n = std::iter::once(Route::PolyN);
 
//         let m = (0..self.ephemeral.poly_ps.len()).map(|index| Route::PolyP { index });
 
//         let c = self.family.children_ports.iter().map(|&port| Route::ChildController { port });
 
//         let subtree_id_iter = n.chain(m).chain(c);
 
//         log!(
 
//             &mut self.logger,
 
//             "Solution Storage has subtree Ids: {:?}",
 
//             &subtree_id_iter.clone().collect::<Vec<_>>()
 
//         );
 
//         subtree_id_iter
 
//     });
 

	
 
//     // 5. kick off the synchronous round of the native actor if it exists
 

	
 
//     log!(&mut self.logger, "Kicking off native's synchronous round...");
 
//     self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
//         // using if let because of nested ? operator
 
//         // TODO check that there are 1+ branches or NO SOLUTION
 
//         let poly_n = self.kick_off_native(sync_batches)?;
 
//         log!(
 
//             &mut self.logger,
 
//             "PolyN kicked off, and has branches with predicates... {:?}",
 
//             poly_n.branches.keys().collect::<Vec<_>>()
 
//         );
 
//         Some(poly_n)
 
//     } else {
 
//         log!(&mut self.logger, "NO NATIVE COMPONENT");
 
//         None
 
//     };
 

	
 
//     // 6. Kick off the synchronous round of each protocol actor
 
//     //    If just one actor becomes inconsistent now, there can be no solution!
 
//     //    TODO distinguish between completed and not completed poly_p's?
 
//     log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
//     for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
//         let my_subtree_id = Route::PolyP { index };
 
//         let m_ctx = PolyPContext {
 
//             my_subtree_id,
 
//             inner: &mut self,
 
//             solution_storage: &mut self.ephemeral.solution_storage,
 
//         };
 
//         use SyncRunResult as Srr;
 
//         let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
//         log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
//         match blocker {
 
//             Srr::NoBranches => return Err(SyncError::Inconsistent),
 
//             Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
//         }
 
//     }
 
//     log!(&mut self.logger, "All Poly machines have been kicked off!");
 

	
 
//     // 7. `solution_storage` may have new solutions for this controller
 
//     //    handle their discovery. LEADER => announce, otherwise => send to parent
 
//     {
 
//         let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
//         log!(
 
//             &mut self.logger,
 
//             "Got {} controller-local solutions before a single RECV: {:?}",
 
//             peeked.len(),
 
//             peeked
 
//         );
 
//     }
 
//     if self.handle_locals_maybe_decide()? {
 
//         return Ok(());
 
//     }
 

	
 
//     // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
//     log!(&mut self.logger, "`No decision yet`. Time to recv messages");
 
//     self.undelay_all();
 
//     'recv_loop: loop {
 
//         log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline);
 
//         let received = match deadline {
 
//             None => {
 
//                 // we have personally timed out. perform a "long" poll.
 
//                 self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
//             }
 
//             Some(d) => match self.recv(d)? {
 
//                 // we have not yet timed out. performed a time-limited poll
 
//                 Some(received) => received,
 
//                 None => {
 
//                     // timed out! send a FAILURE message to the sink,
 
//                     // and henceforth don't time out on polling.
 
//                     deadline = None;
 
//                     match self.family.parent_port {
 
//                         None => {
 
//                             // I am the sink! announce failure and return.
 
//                             return self.end_round_with_decision(Decision::Failure);
 
//                         }
 
//                         Some(parent_port) => {
 
//                             // I am not the sink! send a failure message.
 
//                             let announcement = Msg::CommMsg(CommMsg {
 
//                                 round_index: self.round_index,
 
//                                 contents: CommMsgContents::Failure,
 
//                             });
 
//                             log!(
 
//                                 &mut self.logger,
 
//                                 "Forwarding {:?} to parent with port {:?}",
 
//                                 &announcement,
 
//                                 parent_port
 
//                             );
 
//                             self.endpoint_exts
 
//                                 .get_mut(parent_port)
 
//                                 .expect("ss")
 
//                                 .endpoint
 
//                                 .send(announcement.clone())?;
 
//                             continue; // poll some more
 
//                         }
 
//                     }
 
//                 }
 
//             },
 
//         };
 
//         log!(&mut self.logger, "::: message {:?}...", &received);
 
//         let current_content = match received.msg {
 
//             Msg::SetupMsg(s) => {
 
//                 // This occurs in the event the connector was malformed during connect()
 
//                 println!("WASNT EXPECTING {:?}", s);
 
//                 return Err(SyncError::UnexpectedSetupMsg);
 
//             }
 
//             Msg::CommMsg(CommMsg { round_index, .. }) if round_index < self.round_index => {
 
//                 // Old message! Can safely discard
 
//                 log!(&mut self.logger, "...and its OLD! :(");
 
//                 drop(received);
 
//                 continue 'recv_loop;
 
//             }
 
//             Msg::CommMsg(CommMsg { round_index, .. }) if round_index > self.round_index => {
 
//                 // Message from a next round. Keep for later!
 
//                 log!(&mut self.logger, "... DELAY! :(");
 
//                 self.delay(received);
 
//                 continue 'recv_loop;
 
//             }
 
//             Msg::CommMsg(CommMsg { contents, round_index }) => {
 
//                 log!(
 
//                     &mut self.logger,
 
//                     "... its a round-appropriate CommMsg with port {:?}",
 
//                     received.recipient
 
//                 );
 
//                 assert_eq!(round_index, self.round_index);
 
//                 contents
 
//             }
 
//         };
 
//         match current_content {
 
//             CommMsgContents::Failure => match self.family.parent_port {
 
//                 Some(parent_port) => {
 
//                     let announcement = Msg::CommMsg(CommMsg {
 
//                         round_index: self.round_index,
 
//                         contents: CommMsgContents::Failure,
 
//                     });
 
//                     log!(
 
//                         &mut self.logger,
 
//                         "Forwarding {:?} to parent with port {:?}",
 
//                         &announcement,
 
//                         parent_port
 
//                     );
 
//                     self.endpoint_exts
 
//                         .get_mut(parent_port)
 
//                         .expect("ss")
 
//                         .endpoint
 
//                         .send(announcement.clone())?;
 
//                 }
 
//                 None => return self.end_round_with_decision(Decision::Failure),
 
//             },
 
//             CommMsgContents::Elaborate { partial_oracle } => {
 
//                 // Child controller submitted a subtree solution.
 
//                 if !self.family.children_ports.contains(&received.recipient) {
 
//                     return Err(SyncError::ElaborateFromNonChild);
 
//                 }
 
//                 let subtree_id = Route::ChildController { port: received.recipient };
 
//                 log!(
 
//                     &mut self.logger,
 
//                     "Received elaboration from child for subtree {:?}: {:?}",
 
//                     subtree_id,
 
//                     &partial_oracle
 
//                 );
 
//                 self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
//                     &mut self.logger,
 
//                     subtree_id,
 
//                     partial_oracle,
 
//                 );
 
//                 if self.handle_locals_maybe_decide()? {
 
//                     return Ok(());
 
//                 }
 
//             }
 
//             CommMsgContents::Announce { decision } => {
 
//                 if self.family.parent_port != Some(received.recipient) {
 
//                     return Err(SyncError::AnnounceFromNonParent);
 
//                 }
 
//                 log!(
 
//                     &mut self.logger,
 
//                     "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
//                     received.recipient,
 
//                     &decision
 
//                 );
 
//                 return self.end_round_with_decision(decision);
 
//             }
 
//             CommMsgContents::SendPayload { payload_predicate, payload } => {
 
//                 // check that we expect to be able to receive payloads from this sender
 
//                 assert_eq!(
 
//                     Getter,
 
//                     self.endpoint_exts.get(received.recipient).unwrap().info.polarity
 
//                 );
 

	
 
//                 // message for some actor. Feed it to the appropriate actor
 
//                 // and then give them another chance to run.
 
//                 let subtree_id = port_to_holder.get(&received.recipient);
 
//                 log!(
 
//                     &mut self.logger,
 
//                     "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
//                     subtree_id,
 
//                     &payload_predicate,
 
//                     &payload
 
//                 );
 
//                 let channel_id =
 
//                     self.endpoint_exts.get(received.recipient).expect("UEHFU").info.channel_id;
 
//                 if payload_predicate.query(channel_id) != Some(true) {
 
//                     // sender didn't preserve the invariant
 
//                     return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id));
 
//                 }
 
//                 match subtree_id {
 
//                     None => {
 
//                         // this happens when a message is sent to a component that has exited.
 
//                         // It's safe to drop this message;
 
//                         // The sender branch will certainly not be part of the solution
 
//                     }
 
//                     Some(PolyId::N) => {
 
//                         // Message for NativeMachine
 
//                         self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
//                             received.recipient,
 
//                             &mut self.logger,
 
//                             payload,
 
//                             payload_predicate,
 
//                             &mut self.ephemeral.solution_storage,
 
//                         );
 
//                         if self.handle_locals_maybe_decide()? {
 
//                             return Ok(());
 
//                         }
 
//                     }
 
//                     Some(PolyId::P { index }) => {
 
//                         // Message for protocol actor
 
//                         let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
//                         let m_ctx = PolyPContext {
 
//                             my_subtree_id: Route::PolyP { index: *index },
 
//                             inner: &mut self,
 
//                             solution_storage: &mut self.ephemeral.solution_storage,
 
//                         };
 
//                         use SyncRunResult as Srr;
 
//                         let blocker = poly_p.poly_recv_run(
 
//                             m_ctx,
 
//                             &self.protocol_description,
 
//                             received.recipient,
 
//                             payload_predicate,
 
//                             payload,
 
//                         )?;
 
//                         log!(
 
//                             &mut self.logger,
 
//                             "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
//                             subtree_id,
 
//                             blocker
 
//                         );
 
//                         match blocker {
 
//                             Srr::NoBranches => return Err(SyncError::Inconsistent),
 
//                             Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
//                                 {
 
//                                     let peeked = self
 
//                                         .ephemeral
 
//                                         .solution_storage
 
//                                         .peek_new_locals()
 
//                                         .collect::<Vec<_>>();
 
//                                     log!(
 
//                                         &mut self.logger,
 
//                                         "Got {} new controller-local solutions from RECV: {:?}",
 
//                                         peeked.len(),
 
//                                         peeked
 
//                                     );
 
//                                 }
 
//                                 if self.handle_locals_maybe_decide()? {
 
//                                     return Ok(());
 
//                                 }
 
//                             }
 
//                         }
 
//                     }
 
//                 };
 
//             }
 
//         }
 
//     }
 
// }
 
// }
src/runtime/mod.rs
Show inline comments
 
@@ -272,12 +272,50 @@ impl Connector {
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *self.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        self.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        self.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: self.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
src/runtime/setup2.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
struct LogicalChannelInfo {
 
    local_port: PortId,
 
    peer_port: PortId,
 
    local_polarity: Polarity,
 
    endpoint_index: usize,
 
}
 
///////////////
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
    ) -> Self {
 
        let logger = Box::new(StringLogger::new(controller_id));
 
@@ -55,49 +48,12 @@ impl Connector {
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *self.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        // 4. add new component
 
        self.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: self.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
            }
src/runtime/tests.rs
Show inline comments
 
@@ -176,23 +176,23 @@ fn next_batch() {
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_sync() {
 
fn native_self_msg() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
}
 

	
 
#[test]
 
fn native_message_pass() {
 
fn two_natives_msg() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
@@ -207,6 +207,73 @@ fn native_message_pass() {
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    /*
 
    native p|-->|g sync
 
    */
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.gotten(g1).unwrap();
 
}
0 comments (0 inline, 0 general)