Changeset - c9ffd6e7ade5
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-09-23 12:49:03
christopher.esterhuyse@gmail.com
more safety assertions, some minor cleanup, and major commenting
1 file changed with 91 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -61,6 +61,9 @@ struct CyclicDrainerInner<'a, K: Eq + Hash, V> {
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 

	
 
//////////////// IMPL ////////////////////////////
 

	
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
@@ -68,6 +71,7 @@ impl ReplaceBoolTrue for bool {
 
        !was
 
    }
 
}
 

	
 
// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure,
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
@@ -84,8 +88,6 @@ impl CuUndecided for ConnectorUnphased {
 
        self.native_component_id
 
    }
 
}
 

	
 
////////////////
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
@@ -495,6 +497,12 @@ impl Connector {
 
        ret
 
    }
 

	
 
    // Once the synchronous round has been started, this procedure
 
    // routs and handles payloads, receives control messages from neighboring connectors,
 
    // checks for timeout, and aggregates solutions until a distributed decision is reached.
 
    // The decision is either a solution (success case), or a distributed timeout rollback (failure case)
 
    // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior,
 
    // a network channel breaking, etc.
 
    fn sync_reach_decision(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
@@ -810,6 +818,7 @@ impl NativeBranch {
 
    }
 
}
 
impl BranchingNative {
 

	
 
    // Feed the given payload to the native component
 
    // May result in discovering new component solutions,
 
    // or fork speculative branches if the message's predicate
 
@@ -932,6 +941,7 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
@@ -960,6 +970,7 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 

	
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    // return as `RoundEndedNative` the result of a native completing successful round
 
@@ -992,12 +1003,14 @@ impl BranchingNative {
 
    }
 
}
 
impl BranchingProtoComponent {
 

	
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 

	
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output (by way of CyclicDrainer::cyclic_drain).
 
    // This procedure might lose branches, and it might create new branches.
 
@@ -1108,6 +1121,7 @@ impl BranchingProtoComponent {
 
            Ok(())
 
        })
 
    }
 

	
 
    // Feed this branching protocol component the given message, and
 
    // then run all branches until they are once again blocked. 
 
    fn feed_msg(
 
@@ -1184,6 +1198,7 @@ impl BranchingProtoComponent {
 
        log!(cu.logger(), "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
@@ -1223,6 +1238,28 @@ impl BranchingProtoComponent {
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl ProtoComponentBranch {
 
    // Feed this branch received message.
 
    // It's safe to receive the same message repeatedly,
 
    // but if we receive a message with different contents,
 
    // it's a sign something has gone wrong! keys of type (port, round, predicate)
 
    // should always map to at most one message value!
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let e = self.inner.inbox.entry(getter);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // new message
 
                ev.insert(payload);
 
            }
 
            Entry::Occupied(eo) => {
 
                // redundant recv. can happen as a result of a
 
                // component A having two branches X and Y related by
 
                assert_eq!(eo.get(), &payload);
 
            }
 
        }
 
    }
 
}
 
impl SolutionStorage {
 
    // Create a new solution storage, to manage the local solutions for
 
    // this connector and all of it's children (subtrees) in the solution tree.
 
@@ -1283,6 +1320,9 @@ impl SolutionStorage {
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 

	
 
    // Recursively build local solutions for this connector,
 
    // see `submit_and_digest_subtree_solution`
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        cu: &mut impl CuUndecided,
 
        partial: Predicate,
 
@@ -1290,7 +1330,6 @@ impl SolutionStorage {
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        // 
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep recursively creating combined solutions
 
            for pred in set.iter() {
 
@@ -1314,41 +1353,19 @@ impl SolutionStorage {
 
        }
 
    }
 
}
 

	
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        // Note that this component has received this port's message 1+ times this round
 
        self.branch_inner.did_put_or_get.insert(port);
 
        self.branch_inner.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        // sanity check
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.current_state.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // 2. create new component
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
@@ -1359,12 +1376,14 @@ impl NonsyncProtoContext<'_> {
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // 3. update ownership of moved ports
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        // 3. create a new component
 
    }
 

	
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
@@ -1397,20 +1416,33 @@ impl NonsyncProtoContext<'_> {
 
        [o, i]
 
    }
 
}
 
impl ProtoComponentBranch {
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let e = self.inner.inbox.entry(getter);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // new message
 
                ev.insert(payload);
 
impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
            Entry::Occupied(eo) => {
 
                // redundant recv. can happen as a result of a component A having two branches X and Y related by
 
                assert_eq!(eo.get(), &payload);
 

	
 
    // The component calls the runtime back, trying to inspect a port's message
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        let maybe_msg = self.branch_inner.inbox.get(&port);
 
        if maybe_msg.is_some() {
 
            // Make a note that this component has received
 
            // this port's message 1+ times this round
 
            self.branch_inner.did_put_or_get.insert(port);
 
        }
 
        maybe_msg
 
    }
 

	
 
    // NOT CURRENTLY USED
 
    // Once this component has injected a new nondeterministic branch with
 
    // SyncBlocker::NondetChoice, this is how the component retrieves it.
 
    // (Two step process necessary to get around mutable access rules,
 
    //  as injection of the nondeterministic choice modifies the
 
    //  branch predicate, forks the branch, etc.)
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
@@ -1421,15 +1453,16 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainerInner { swap, output } }
 
    }
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 

	
 
    // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
    // A "drain" would refer to a procedure that empties the input and populates the output.
 
    // It's "cyclic" because the processing function can also populate the input.
 
    // Making this memory safe requires an additional temporary storage, such that
 
    // the input can safely be drained and populated concurrently.
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
@@ -1442,3 +1475,14 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
        Ok(())
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> {
 
    // Add this key-value pair to be yielded by the drainer later
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 

	
 
    // Add this key-value pair as an output of the drainer
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
0 comments (0 inline, 0 general)