Changeset - 966ebed86908
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-10-16 12:57:46
christopher.esterhuyse@gmail.com
simplified the cyclic drainer because it was unnecessarily complex. added a unit test. added some future-proofing for corner case protocols
2 files changed with 174 insertions and 151 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -37,30 +37,24 @@ struct BranchingProtoComponent {
 
// `ended` IFF this branch has reached SyncBlocker::SyncBlockEnd before.
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    state: ComponentState,
 
    inner: ProtoComponentBranchInner,
 
    ended: bool,
 
}
 

	
 
// A structure wrapping a set of three pointers, making it impossible
 
// to miss that they are being setup for `cyclic_drain`.
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainerInner<'a, K, V>,
 
}
 

	
 
// Inner substructure of the Cyclic drainer to be passed through a callback function.
 
// See `CyclicDrainer::cyclic_drain`.
 
struct CyclicDrainerInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 

	
 
// Small convenience trait for extending the stdlib's bool type with
 
// an optionlike replace method for increasing brevity.
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 

	
 
//////////////// IMPL ////////////////////////////
 

	
 
@@ -526,25 +520,25 @@ impl Connector {
 
        log!(
 
            cu.logger(),
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { branches } = proto_component;
 
            // must reborrow to constrain the lifetime of pcb_temps to inside the loop
 
            let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut();
 
            let (blocked, _pcb_temps) = pcb_temps.split_first_mut();
 
            // initially, no protocol components have .ended==true
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, swap.0, blocked.0);
 
            let cd = CyclicDrainer { input: branches, swap: swap.0, output: blocked.0 };
 
            BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
            // swap the blocked branches back
 
            std::mem::swap(blocked.0, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger(), "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                } else {
 
@@ -789,25 +783,24 @@ impl Connector {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }),
 
        });
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl NativeBranch {
 
    fn is_ended(&self) -> bool {
 
        self.to_get.is_empty()
 
    }
 
}
 
impl BranchingNative {
 

	
 
    // Feed the given payload to the native component
 
    // May result in discovering new component solutions,
 
    // or fork speculative branches if the message's predicate
 
    // is MORE SPECIFIC than the branches of the native
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
@@ -974,145 +967,159 @@ impl BranchingNative {
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundEndedNative { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 

	
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 

	
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output (by way of CyclicDrainer::cyclic_drain).
 
    // populating cd.output by cyclically draining "input" -> "cd."input" / cd.output.
 
    // (to prevent concurrent r/w of one structure, we realize "input" as cd.input for reading and cd.swap for writing)
 
    // This procedure might lose branches, and it might create new branches.
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                rctx,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            // Run this component's state to the next syncblocker for handling
 
            let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
            log!(
 
                cu.logger(),
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                B::CouldntReadMsg(port) => {
 
                    // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                    assert!(!branch.inner.inbox.contains_key(&port)); 
 
                    // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.ips.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity);
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.ips.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
                        log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
        // let CyclicDrainer { input, swap, output } = cd;
 
        while !cd.input.is_empty() {
 
            'branch_iter: for (mut predicate, mut branch) in cd.input.drain() {
 
                let mut ctx = SyncProtoContext {
 
                    rctx,
 
                    predicate: &predicate,
 
                    branch_inner: &mut branch.inner,
 
                };
 
                // Run this component's state to the next syncblocker for handling
 
                let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
                log!(
 
                    cu.logger(),
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                    B::CouldntReadMsg(port) => {
 
                        // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                        assert!(!branch.inner.inbox.contains_key(&port));
 
                        // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                        Self::insert_branch_merging(cd.output, predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                        let var = rctx.ips.port_info.spec_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 

	
 
                        Self::insert_branch_merging(
 
                            cd.swap,
 
                            predicate.clone().inserted(var, SpecVal::SILENT),
 
                            branch.clone(),
 
                        );
 
                        Self::insert_branch_merging(
 
                            cd.swap,
 
                            predicate.inserted(var, SpecVal::FIRING),
 
                            branch,
 
                        );
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check: The given port indeed has `Putter` polarity
 
                        assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity);
 
                        // assign FIRING to this port's associated firing variable
 
                        let var = rctx.ips.port_info.spec_var_for(putter);
 
                        let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                        if was == Some(SpecVal::SILENT) {
 
                            // Discard the branch, as it clearly has contradictory requirements for this value.
 
                            log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        drop((predicate, branch));
 
                    } else {
 
                        // Note that this port has put this round,
 
                        // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error)
 
                        assert!(branch.inner.did_put_or_get.insert(putter));
 
                        log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            drop((predicate, branch));
 
                        } else {
 
                            // Note that this port has put this round,
 
                            // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error)
 
                            assert!(branch.inner.did_put_or_get.insert(putter));
 
                            log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &predicate, &payload, putter, var);
 
                        // Send the given payload (by buffering it).
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.putter_push(cu, putter, msg);
 
                        // Branch can still make progress. Schedule to be rerun
 
                        drainer.add_input(predicate, branch);
 
                            // Send the given payload (by buffering it).
 
                            let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                            rctx.putter_push(cu, putter, msg);
 
                            // Branch can still make progress. Schedule to be rerun
 

	
 
                            Self::insert_branch_merging(cd.swap, predicate, branch);
 
                        }
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.ips.port_info.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.port_info.spec_var_for(*port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
                        if actually_exchanged != speculated_to_fire {
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}",
 
                    B::SyncBlockEnd => {
 
                        // This branch reached the end of it's synchronous block
 
                        // assign all variables of owned ports that DIDN'T fire to SILENT
 
                        for port in rctx.ips.port_info.ports_owned_by(proto_component_id) {
 
                            let var = rctx.ips.port_info.spec_var_for(*port);
 
                            let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                            let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                            let speculated_to_fire = val == SpecVal::FIRING;
 
                            if actually_exchanged != speculated_to_fire {
 
                                log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}",
 
                                port, var, val, actually_exchanged, speculated_to_fire);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                                // IMPLICIT inconsistency
 
                                drop((predicate, branch));
 
                                continue 'branch_iter;
 
                            }
 
                        }
 
                        // submit solution for this component
 
                        let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                        rctx.solution_storage.submit_and_digest_subtree_solution(
 
                            cu,
 
                            subtree_id,
 
                            predicate.clone(),
 
                        );
 
                        branch.ended = true;
 
                        // This branch exits the cyclic drain
 
                        Self::insert_branch_merging(cd.output, predicate, branch);
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        cu,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // This branch exits the cyclic drain
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::NondetChoice { n } => {
 
                    // This branch requested the creation of a new n-way nondeterministic
 
                    // fork of the branch with a fresh speculative variable.
 
                    // ... allocate a new speculative variable 
 
                    let var = rctx.spec_var_stream.next();
 
                    // ... and for n distinct values, create a new forked branch,
 
                    // and schedule them to be rerun through the cyclic drain.
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    B::NondetChoice { n } => {
 
                        // This branch requested the creation of a new n-way nondeterministic
 
                        // fork of the branch with a fresh speculative variable.
 
                        // ... allocate a new speculative variable
 
                        let var = rctx.spec_var_stream.next();
 
                        // ... and for n distinct values, create a new forked branch,
 
                        // and schedule them to be rerun through the cyclic drain.
 
                        for val in SpecVal::iter_domain().take(n as usize) {
 
                            let predicate_n = predicate.clone().inserted(var, val);
 
                            let mut branch_n = branch.clone();
 
                            branch_n.inner.untaken_choice = Some(val.0);
 
                            Self::insert_branch_merging(cd.swap, predicate_n, branch_n);
 
                        }
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
            std::mem::swap(cd.input, cd.swap);
 
        }
 
        Ok(())
 
    }
 

	
 
    // Feed this branching protocol component the given message, and
 
    // then run all branches until they are once again blocked. 
 
    // then run all branches until they are once again blocked.
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(
 
            cu.logger(),
 
            "feeding proto component {:?} getter {:?} {:?}",
 
@@ -1162,25 +1169,25 @@ impl BranchingProtoComponent {
 
                    log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    // the branch that receives the message is unblocked, the original one is blocked
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        let cd = CyclicDrainer { input: unblocked.0, swap: swap.0, output: blocked.0 };
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys());
 
        Ok(())
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
@@ -1192,28 +1199,28 @@ impl BranchingProtoComponent {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
                old.ended |= branch.ended;
 
            }
 
        }
 
    }
 

	
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return state;
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
@@ -1266,26 +1273,26 @@ impl SolutionStorage {
 
        }
 
    }
 
    // drain old_local to new_local, visiting all new additions to old_local
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            // rely on invariant: empty intersection between old and new local sets
 
            assert!(old_local.insert(local.clone()));
 
            local
 
        })
 
    }
 
    // insert a solution for the given subtree ID,
 
    // AND update new_local to include any solutions that become 
 
    // possible as a result of this new addition  
 
    // AND update new_local to include any solutions that become
 
    // possible as a result of this new addition
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self;
 
        let index = subtree_id_to_index[&subtree_id];
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            // This is a newly-added solution! update new_local
 
@@ -1331,28 +1338,25 @@ impl SolutionStorage {
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.ips.port_info.map.get(port).unwrap().owner
 
            );
 
            assert_eq!(self.proto_component_id, self.ips.port_info.map.get(port).unwrap().owner);
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
@@ -1426,53 +1430,12 @@ impl SyncProtoContext<'_> {
 
    }
 

	
 
    // NOT CURRENTLY USED
 
    // Once this component has injected a new nondeterministic branch with
 
    // SyncBlocker::NondetChoice, this is how the component retrieves it.
 
    // (Two step process necessary to get around mutable access rules,
 
    //  as injection of the nondeterministic choice modifies the
 
    //  branch predicate, forks the branch, etc.)
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainerInner { swap, output } }
 
    }
 

	
 
    // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
    // A "drain" would refer to a procedure that empties the input and populates the output.
 
    // It's "cyclic" because the processing function can also populate the input.
 
    // Making this memory safe requires an additional temporary storage, such that
 
    // the input can safely be drained and populated concurrently.
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                // func is the user-provided callback function, which consumes an element
 
                // as its drained from the input
 
                func(k, v, CyclicDrainerInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> {
 
    // Add this key-value pair to be yielded by the drainer later
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 

	
 
    // Add this key-value pair as an output of the drainer
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -1302,12 +1302,72 @@ fn for_msg_byte() {
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn eq_no_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    composite eq(in a, in b, out c) {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) {
 
        msg ma = null;
 
        msg mb = null;
 
        while(true) synchronous {
 
            if(fires(leftfirsti)) {
 
                // left first! DO USE DUMMY
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 

	
 
                // using dummy!
 
                put(leftfirsto, ma);
 
                get(leftfirsti);
 
            } else {
 
                // right first! DON'T USE DUMMY
 
                mb = get(b);
 
                put(c, mb);
 
                ma = get(a);
 
            }
 
            assert(ma == mb);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..32 {
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(SEC1).unwrap();
 
    }
 
}
0 comments (0 inline, 0 general)