From 66513c4f112d0206352aa9867ea37ecb8fc433ba 2020-09-22 17:28:43 From: Christopher Esterhuyse Date: 2020-09-22 17:28:43 Subject: [PATCH] more comments and simplification --- diff --git a/src/common.rs b/src/common.rs index 833795dcff0ac6c1f792559c21309d069bcf224e..3112933de493280ae1d39235bbbcb743cfbd8341 100644 --- a/src/common.rs +++ b/src/common.rs @@ -100,8 +100,8 @@ pub(crate) enum SyncBlocker { PutMsg(PortId, Payload), NondetChoice { n: u16 }, } -pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); - +struct DenseDebugHex<'a>(pub &'a [u8]); +struct DebuggableIter + Clone, T: Debug>(pub(crate) I); ///////////////////// IMPL ///////////////////// impl IdParts for Id { fn id_parts(self) -> (ConnectorId, U32Suffix) { @@ -236,3 +236,9 @@ impl Debug for DenseDebugHex<'_> { Ok(()) } } + +impl + Clone, T: Debug> Debug for DebuggableIter { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + f.debug_list().entries(self.0.clone()).finish() + } +} diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 10510935b5ece0a80c312bdfba36912726dfe43a..14c75a604c8017ba623579e7f3f7a2157859ac16 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,39 +2,62 @@ use super::*; use crate::common::*; use core::ops::{Deref, DerefMut}; -//////////////// // Guard protecting an incrementally unfoldable slice of MapTempGuard elements struct MapTempsGuard<'a, K, V>(&'a mut [HashMap]); + // Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true struct MapTempGuard<'a, K, V>(&'a mut HashMap); +// Once the synchronous round has begun, this structure manages the +// native component's speculative branches, one per synchronous batch. struct BranchingNative { branches: HashMap, } + +// Corresponds to one of the native's synchronous batches during the synchronous round. +// ports marked for message receipt correspond to entries of +// (a) `gotten` if they have not received yet, +// (b) `to_get` if they have already received, with the given payload. +// The branch corresponds to a component solution IFF to_get is empty. #[derive(Clone, Debug)] struct NativeBranch { index: usize, gotten: HashMap, to_get: HashSet, } + +// Manages a protocol component's speculative branches for the duration +// of the synchronous round. #[derive(Debug)] struct BranchingProtoComponent { branches: HashMap, } + +// One specualtive branch of a protocol component. +// `ended` IFF this branch has reached SyncBlocker::SyncBlockEnd before. #[derive(Debug, Clone)] struct ProtoComponentBranch { state: ComponentState, inner: ProtoComponentBranchInner, ended: bool, } + +// A structure wrapping a set of three pointers, making it impossible +// to miss that they are being setup for `cyclic_drain`. struct CyclicDrainer<'a, K: Eq + Hash, V> { input: &'a mut HashMap, - inner: CyclicDrainInner<'a, K, V>, + inner: CyclicDrainerInner<'a, K, V>, } -struct CyclicDrainInner<'a, K: Eq + Hash, V> { + +// Inner substructure of the Cyclic drainer to be passed through a callback function. +// See `CyclicDrainer::cyclic_drain`. +struct CyclicDrainerInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } + +// Small convenience trait for extending the stdlib's bool type with +// an optionlike replace method for increasing brevity. trait ReplaceBoolTrue { fn replace_with_true(&mut self) -> bool; } @@ -185,6 +208,9 @@ impl Connector { /// consequently, the given timeout duration represents a duration in which the connector /// will make a best effort to fail the round and return control flow to the caller. pub fn sync(&mut self, timeout: Option) -> Result { + // This method first destructures the connector, and checks for obvious + // failure cases. The bulk of the behavior continues in `connected_sync`, + // to minimize indentation, and enable convient ?-style short circuit syntax. let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), @@ -206,8 +232,11 @@ impl Connector { } } } - // private function. mutates state but returns with round - // result ASAP (allows for convenient error return with ?) + + // Attempts to complete the synchronous round for the given + // communication-phased connector structure. + // Modifies components and ports in `cu` IFF the round succeeds. + #[inline] fn connected_sync( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, @@ -226,8 +255,10 @@ impl Connector { ); log!(@BENCH, cu.logger(), ""); - // 1. run all proto components to Nonsync blockers - // iterate + // Create separate storages for ports and components stored in `cu`, + // while kicking off the branching of components until the set of + // components entering their synchronous block is finalized in `branching_proto_components`. + // This is the last time cu's components and ports are accessed until the round is decided. let mut current_state = cu.current_state.clone(); let mut branching_proto_components = HashMap::::default(); @@ -237,7 +268,9 @@ impl Connector { .map(|(&proto_id, proto)| (proto_id, proto.clone())) .collect(); log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len()); - // drains unrun_components, and populates branching_proto_components. + // initially, the set of components to run is the set of components stored by `cu`, + // but they are eventually drained into `branching_proto_components`. + // Some components exit first, and others are created and put into `unrun_components`. while let Some((proto_component_id, mut component)) = unrun_components.pop() { log!( cu.logger(), @@ -263,10 +296,9 @@ impl Connector { match blocker { B::ComponentExit => drop(component), B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)), - B::SyncBlockStart => { - branching_proto_components - .insert(proto_component_id, BranchingProtoComponent::initial(component)); - } + B::SyncBlockStart => assert!(branching_proto_components + .insert(proto_component_id, BranchingProtoComponent::initial(component)) + .is_none()), // Some(_) returned IFF some component identifier key is overwritten (BAD!) } } log!( @@ -276,39 +308,53 @@ impl Connector { ); log!(@BENCH, cu.logger(), ""); - // Create temp structures needed for the synchronous phase of the round + // Create temporary structures needed for the synchronous phase of the round let mut rctx = RoundCtx { - current_state, + current_state, // already used previously, now moved into RoundCtx solution_storage: { - let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id)); - let c = - branching_proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid)); - let e = comm - .neighborhood - .children - .iter() - .map(|&index| SubtreeId::NetEndpoint { index }); - let subtree_id_iter = n.chain(c).chain(e); + let subtree_id_iter = { + // Create an iterator over the identifiers of this + // connector's childen in the _solution tree_. + // Namely, the native, all locally-managed components, + // and all this connector's children in the _consensus tree_ (other connectors). + let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id)); + let c = branching_proto_components + .keys() + .map(|&cid| SubtreeId::LocalComponent(cid)); + let e = comm + .neighborhood + .children + .iter() + .map(|&index| SubtreeId::NetEndpoint { index }); + n.chain(c).chain(e) + }; log!( cu.logger, "Children in subtree are: {:?}", - subtree_id_iter.clone().collect::>() + DebuggableIter(subtree_id_iter.clone()) ); SolutionStorage::new(subtree_id_iter) }, spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(), - payload_inbox: Default::default(), + payload_inbox: Default::default(), // buffer for in-memory payloads to be handled deadline: timeout.map(|to| Instant::now() + to), }; log!(cu.logger(), "Round context structure initialized"); log!(@BENCH, cu.logger(), ""); - // Explore all native branches eagerly. Find solutions, buffer messages, etc. + // Prepare the branching native component, involving the conversion + // of its synchronous batches (user provided) into speculative branches eagerly. + // As a side effect, send all PUTs with the appropriate predicates. + // Afterwards, each native component's speculative branch finds a local + // solution the moment it's received all the messages it's awaiting. log!( cu.logger(), "Translating {} native batches into branches...", comm.native_batches.len() ); + // Allocate a single speculative variable to distinguish each native branch. + // This enables native components to have distinct branches with identical + // FIRING variables. let native_spec_var = rctx.spec_var_stream.next(); log!(cu.logger(), "Native branch spec var is {:?}", native_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; @@ -316,11 +362,11 @@ impl Connector { comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) { let NativeBatch { to_get, to_put } = native_branch; + // compute the solution predicate to associate with this branch. let predicate = { let mut predicate = Predicate::default(); // all firing ports have SpecVal::FIRING let firing_iter = to_get.iter().chain(to_put.keys()).copied(); - log!( cu.logger(), "New native with firing ports {:?}", @@ -367,6 +413,7 @@ impl Connector { } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { + // empty to_get set => already corresponds with a component solution log!( cu.logger(), "Native submitting solution for batch {} with {:?}", @@ -459,6 +506,7 @@ impl Connector { log!(@MARK, cu.logger(), "decide start"); let mut already_requested_failure = false; if branching_native.branches.is_empty() { + // An unsatisfiable native is the easiest way to detect failure log!(cu.logger(), "Native starts with no branches! Failure!"); match comm.neighborhood.parent { Some(parent) => { @@ -474,6 +522,9 @@ impl Connector { } } } + + // Create a small set of "workspace" hashmaps, to be passed by-reference into various calls. + // This is an optimization, avoiding repeated allocation. let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); let mut bn_temp_owner = >::default(); @@ -1162,7 +1213,7 @@ impl SyncProtoContext<'_> { self.branch_inner.untaken_choice.take() } } -impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { +impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } @@ -1252,17 +1303,17 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { swap: &'a mut HashMap, output: &'a mut HashMap, ) -> Self { - Self { input, inner: CyclicDrainInner { swap, output } } + Self { input, inner: CyclicDrainerInner { swap, output } } } fn cyclic_drain( self, - mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>, + mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> { - let Self { input, inner: CyclicDrainInner { swap, output } } = self; + let Self { input, inner: CyclicDrainerInner { swap, output } } = self; // assert!(swap.is_empty()); while !input.is_empty() { for (k, v) in input.drain() { - func(k, v, CyclicDrainInner { swap, output })? + func(k, v, CyclicDrainerInner { swap, output })? } std::mem::swap(input, swap); }