From c9ffd6e7ade5ed76cd685f4faae50f67a937b08a 2020-09-23 12:49:03 From: Christopher Esterhuyse Date: 2020-09-23 12:49:03 Subject: [PATCH] more safety assertions, some minor cleanup, and major commenting --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 140d03c3b079b308cfb9eb49ba0c38f77ffe7da9..91555a7f9e34e40ad9775a3b6ffa30fa23289cf7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -61,6 +61,9 @@ struct CyclicDrainerInner<'a, K: Eq + Hash, V> { trait ReplaceBoolTrue { fn replace_with_true(&mut self) -> bool; } + +//////////////// IMPL //////////////////////////// + impl ReplaceBoolTrue for bool { fn replace_with_true(&mut self) -> bool { let was = *self; @@ -68,6 +71,7 @@ impl ReplaceBoolTrue for bool { !was } } + // CuUndecided provides a mostly immutable view into the ConnectorUnphased structure, // making it harder to accidentally mutate its contents in a way that cannot be rolled back. impl CuUndecided for ConnectorUnphased { @@ -84,8 +88,6 @@ impl CuUndecided for ConnectorUnphased { self.native_component_id } } - -//////////////// impl<'a, K, V> MapTempsGuard<'a, K, V> { fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> { MapTempsGuard(self.0) @@ -495,6 +497,12 @@ impl Connector { ret } + // Once the synchronous round has been started, this procedure + // routs and handles payloads, receives control messages from neighboring connectors, + // checks for timeout, and aggregates solutions until a distributed decision is reached. + // The decision is either a solution (success case), or a distributed timeout rollback (failure case) + // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior, + // a network channel breaking, etc. fn sync_reach_decision( cu: &mut impl CuUndecided, comm: &mut ConnectorCommunication, @@ -810,6 +818,7 @@ impl NativeBranch { } } impl BranchingNative { + // Feed the given payload to the native component // May result in discovering new component solutions, // or fork speculative branches if the message's predicate @@ -932,6 +941,7 @@ impl BranchingNative { } } } + // Insert a new speculate branch into the given storage, // MERGING it with an existing branch if their predicate keys clash. fn insert_branch_merging( @@ -960,6 +970,7 @@ impl BranchingNative { } } } + // Given the predicate for the round's solution, collapse this // branching native to an ended branch whose predicate is consistent with it. // return as `RoundEndedNative` the result of a native completing successful round @@ -992,12 +1003,14 @@ impl BranchingNative { } } impl BranchingProtoComponent { + // Create a singleton-branch branching protocol component as // speculation begins, with the given protocol state. fn initial(state: ComponentState) -> Self { let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; Self { branches: hashmap! { Predicate::default() => branch } } } + // run all the given branches (cd.input) to their SyncBlocker, // populating cd.output (by way of CyclicDrainer::cyclic_drain). // This procedure might lose branches, and it might create new branches. @@ -1108,6 +1121,7 @@ impl BranchingProtoComponent { Ok(()) }) } + // Feed this branching protocol component the given message, and // then run all branches until they are once again blocked. fn feed_msg( @@ -1184,6 +1198,7 @@ impl BranchingProtoComponent { log!(cu.logger(), "component settles down with branches: {:?}", branches.keys()); Ok(()) } + // Insert a new speculate branch into the given storage, // MERGING it with an existing branch if their predicate keys clash. fn insert_branch_merging( @@ -1223,6 +1238,28 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } } +impl ProtoComponentBranch { + // Feed this branch received message. + // It's safe to receive the same message repeatedly, + // but if we receive a message with different contents, + // it's a sign something has gone wrong! keys of type (port, round, predicate) + // should always map to at most one message value! + fn feed_msg(&mut self, getter: PortId, payload: Payload) { + let e = self.inner.inbox.entry(getter); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + // new message + ev.insert(payload); + } + Entry::Occupied(eo) => { + // redundant recv. can happen as a result of a + // component A having two branches X and Y related by + assert_eq!(eo.get(), &payload); + } + } + } +} impl SolutionStorage { // Create a new solution storage, to manage the local solutions for // this connector and all of it's children (subtrees) in the solution tree. @@ -1283,6 +1320,9 @@ impl SolutionStorage { Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); } } + + // Recursively build local solutions for this connector, + // see `submit_and_digest_subtree_solution` fn elaborate_into_new_local_rec<'a, 'b>( cu: &mut impl CuUndecided, partial: Predicate, @@ -1290,7 +1330,6 @@ impl SolutionStorage { old_local: &'b HashSet, new_local: &'a mut HashSet, ) { - // if let Some(set) = set_visitor.next() { // incomplete solution. keep recursively creating combined solutions for pred in set.iter() { @@ -1314,41 +1353,19 @@ impl SolutionStorage { } } } - -impl SyncProtoContext<'_> { - pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.rctx.current_state.spec_var_for(port); - self.predicate.query(var).map(SpecVal::is_firing) - } - pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - // Note that this component has received this port's message 1+ times this round - self.branch_inner.did_put_or_get.insert(port); - self.branch_inner.inbox.get(&port) - } - pub(crate) fn take_choice(&mut self) -> Option { - self.branch_inner.untaken_choice.take() - } -} -impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> { - fn add_input(&mut self, k: K, v: V) { - self.swap.insert(k, v); - } - fn add_output(&mut self, k: K, v: V) { - self.output.insert(k, v); - } -} impl NonsyncProtoContext<'_> { + // Facilitates callback from the component to the connector runtime, + // creating a new component and changing the given port's ownership to that + // of the new component. pub(crate) fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { - // called by a PROTO COMPONENT. moves its own ports. - // 1. sanity check: this component owns these ports - // sanity check + // Sanity check! The moved ports are owned by this component to begin with for port in moved_ports.iter() { assert_eq!( self.proto_component_id, self.current_state.port_info.get(port).unwrap().owner ); } - // 2. create new component + // Create the new component, and schedule it to be run let new_cid = self.current_state.id_manager.new_component_id(); log!( self.logger, @@ -1359,12 +1376,14 @@ impl NonsyncProtoContext<'_> { &moved_ports ); self.unrun_components.push((new_cid, state)); - // 3. update ownership of moved ports + // Update the ownership of the moved ports for port in moved_ports.iter() { self.current_state.port_info.get_mut(port).unwrap().owner = new_cid; } - // 3. create a new component } + + // Facilitates callback from the component to the connector runtime, + // creating a new port-pair connected by an memory channel pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component let mut new_cid_fn = || self.current_state.id_manager.new_port_id(); @@ -1397,20 +1416,33 @@ impl NonsyncProtoContext<'_> { [o, i] } } -impl ProtoComponentBranch { - fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let e = self.inner.inbox.entry(getter); - use std::collections::hash_map::Entry; - match e { - Entry::Vacant(ev) => { - // new message - ev.insert(payload); - } - Entry::Occupied(eo) => { - // redundant recv. can happen as a result of a component A having two branches X and Y related by - assert_eq!(eo.get(), &payload); - } +impl SyncProtoContext<'_> { + // The component calls the runtime back, inspecting whether it's associated + // preidcate has already determined a (speculative) value for the given port's firing variable. + pub(crate) fn is_firing(&mut self, port: PortId) -> Option { + let var = self.rctx.current_state.spec_var_for(port); + self.predicate.query(var).map(SpecVal::is_firing) + } + + // The component calls the runtime back, trying to inspect a port's message + pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + let maybe_msg = self.branch_inner.inbox.get(&port); + if maybe_msg.is_some() { + // Make a note that this component has received + // this port's message 1+ times this round + self.branch_inner.did_put_or_get.insert(port); } + maybe_msg + } + + // NOT CURRENTLY USED + // Once this component has injected a new nondeterministic branch with + // SyncBlocker::NondetChoice, this is how the component retrieves it. + // (Two step process necessary to get around mutable access rules, + // as injection of the nondeterministic choice modifies the + // branch predicate, forks the branch, etc.) + pub(crate) fn take_choice(&mut self) -> Option { + self.branch_inner.untaken_choice.take() } } impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { @@ -1421,15 +1453,16 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { ) -> Self { Self { input, inner: CyclicDrainerInner { swap, output } } } + + // This hides the ugliness of facilitating a memory-safe cyclic drain. + // A "drain" would refer to a procedure that empties the input and populates the output. + // It's "cyclic" because the processing function can also populate the input. + // Making this memory safe requires an additional temporary storage, such that + // the input can safely be drained and populated concurrently. fn cyclic_drain( self, mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> { - // This hides the ugliness of facilitating a memory-safe cyclic drain. - // A "drain" would refer to a procedure that empties the input and populates the output. - // It's "cyclic" because the processing function can also populate the input. - // Making this memory safe requires an additional temporary storage, such that - // the input can safely be drained and populated concurrently. let Self { input, inner: CyclicDrainerInner { swap, output } } = self; while !input.is_empty() { for (k, v) in input.drain() { @@ -1442,3 +1475,14 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { Ok(()) } } +impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> { + // Add this key-value pair to be yielded by the drainer later + fn add_input(&mut self, k: K, v: V) { + self.swap.insert(k, v); + } + + // Add this key-value pair as an output of the drainer + fn add_output(&mut self, k: K, v: V) { + self.output.insert(k, v); + } +}