diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cd6e88d0692245b159b8d32ab094431f125a0a79..37b1dff24e324a9cd07455480885b1a47880bc06 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -15,7 +15,7 @@ use crate::common::*; use error::*; use mio::net::UdpSocket; -/// Each Connector structure is the interface between the user's application and a communication session, +/// The interface between the user's application and a communication session, /// in which the application plays the part of a (native) component. This structure provides the application /// with functionality available to all components: the ability to add new channels (port pairs), and to /// instantiate new components whose definitions are defined in the connector's configured protocol @@ -47,58 +47,75 @@ pub struct DummyLogger; /// A logger that writes the logged lines to a given file. #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); -#[derive(Debug, Clone)] -struct CurrentState { - port_info: HashMap, - id_manager: IdManager, -} + +// Interface between protocol state and the connector runtime BEFORE all components +// ave begun their branching speculation. See ComponentState::nonsync_run. pub(crate) struct NonsyncProtoContext<'a> { current_state: &'a mut CurrentState, logger: &'a mut dyn Logger, - // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase proto_component_id: ComponentId, // KEY in id->component map } + +// Interface between protocol state and the connector runtime AFTER all components +// have begun their branching speculation. See ComponentState::sync_run. pub(crate) struct SyncProtoContext<'a> { rctx: &'a RoundCtx, branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch predicate: &'a Predicate, // KEY in pred->branch map } + +// The data coupled with a particular protocol component branch, but crucially omitting +// the `ComponentState` such that this may be passed by reference to the state with separate +// access control. #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { untaken_choice: Option, did_put_or_get: HashSet, inbox: HashMap, } + +// A speculative variable that lives for the duration of the synchronous round. +// Each is assigned a value in domain `SpecVal`. #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVar(PortId); + +// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, +// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVal(u16); + +// Data associated with a successful synchronous round, retained afterwards such that the +// native component can freely reflect on how it went, reading the messages received at their +// inputs, and reflecting on which of their connector's synchronous batches succeeded. #[derive(Debug)] struct RoundOk { batch_index: usize, gotten: HashMap, } + +// Implementation of a set in terms of a vector (optimized for reading, not writing) #[derive(Default)] struct VecSet { // invariant: ordered, deduplicated vec: Vec, } + +// Allows a connector to remember how to forward payloads towards the component that +// owns their destination port. `LocalComponent` corresponds with messages for components +// managed by the connector itself (hinting for it to look it up in a local structure), +// whereas the other variants direct the connector to forward the messages over the network. #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { LocalComponent, NetEndpoint { index: usize }, UdpEndpoint { index: usize }, } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct MyPortInfo { polarity: Polarity, @@ -235,6 +252,13 @@ struct PortInfo { route: Route, } +#[derive(Debug, Clone)] +struct CurrentState { + port_info: HashMap, + id_manager: IdManager, +} + +// A component's setup-phase-specific data #[derive(Debug)] struct ConnectorCommunication { round_index: usize, @@ -247,36 +271,63 @@ struct ConnectorCommunication { struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, - inner: ConnectorUnphasedInner, -} -#[derive(Debug)] -struct ConnectorUnphasedInner { logger: Box, current_state: CurrentState, native_component_id: ComponentId, } -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} + +// A connector's phase-specific data #[derive(Debug)] enum ConnectorPhased { Setup(Box), Communication(Box), } + +// A connector's setup-phase-specific data +#[derive(Debug)] +struct ConnectorSetup { + net_endpoint_setups: Vec, + udp_endpoint_setups: Vec, +} + #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] struct Predicate { assigned: BTreeMap, } + +// Identifies a child of this connector in the _solution tree_. +// Each connector creates its own local solutions for the consensus procedure during `sync`, +// from the solutions of its children. Those children are either locally-managed components, +// (which are leaves in the solution tree), or other connectors reachable through the given +// network endpoint (which are internal nodes in the solution tree). +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +enum SubtreeId { + LocalComponent(ComponentId), + NetEndpoint { index: usize }, +} + +// An accumulation of the connector's knowledge of all (a) the local solutions its children +// in the solution tree have found, and (b) its own solutions derivable from those of its children. +// This structure starts off each round with an empty set, and accumulates solutions as they are found +// by local components, or received over the network in control messages. +// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to +// say that these sets GROW until the round is over, and all solutions are reset. #[derive(Debug)] struct SolutionStorage { + // invariant: old_local U new_local solutions are those that can be created from + // the UNION of one element from each set in `subtree_solution`. + // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. old_local: HashSet, new_local: HashSet, // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, } + +// Stores the transient data of a synchronous round. +// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of +// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, +// and can be undone if the round fails. struct RoundCtx { solution_storage: SolutionStorage, spec_var_stream: SpecVarStream, @@ -284,58 +335,51 @@ struct RoundCtx { deadline: Option, current_state: CurrentState, } + +// A trait intended to limit the access of the ConnectorUnphased structure +// such that we don't accidentally modify any important component/port data +// while the results of the round are undecided. Why? Any actions during Connector::sync +// are _speculative_ until the round is decided, and we need a safe way of rolling +// back any changes. trait CuUndecided { fn logger(&mut self) -> &mut dyn Logger; fn proto_description(&self) -> &ProtocolDescription; fn native_component_id(&self) -> ComponentId; fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); } + +// Represents a set of synchronous port operations that the native component +// has described as an "option" for completing during the synchronous rounds. +// Operations contained here succeed together or not at all. +// A native with N=2+ batches are expressing an N-way nondeterministic choice #[derive(Debug, Default)] struct NativeBatch { // invariant: putters' and getters' polarities respected to_put: HashMap, to_get: HashSet, } + +// Parallels a mio::Token type, but more clearly communicates +// the way it identifies the evented structre it corresponds to. +// See runtime/setup for methods converting between TokenTarget and mio::Token #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, Waker, } + +// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, +// such that it can know when to continue polling, and when to block. enum CommRecvOk { TimeoutWithoutNew, NewPayloadMsgs, NewControlMsg { net_index: usize, msg: CommCtrlMsg }, } //////////////// -fn would_block(err: &std::io::Error) -> bool { +fn err_would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } -impl TokenTarget { - const HALFWAY_INDEX: usize = usize::MAX / 2; - const MAX_INDEX: usize = usize::MAX; - const WAKER_TOKEN: usize = Self::MAX_INDEX; -} -impl From for TokenTarget { - fn from(Token(index): Token) -> Self { - if index == Self::WAKER_TOKEN { - TokenTarget::Waker - } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { - TokenTarget::UdpEndpoint { index: shifted } - } else { - TokenTarget::NetEndpoint { index } - } - } -} -impl Into for TokenTarget { - fn into(self) -> Token { - match self { - TokenTarget::Waker => Token(Self::WAKER_TOKEN), - TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), - TokenTarget::NetEndpoint { index } => Token(index), - } - } -} impl VecSet { fn new(mut vec: Vec) -> Self { vec.sort(); @@ -448,13 +492,13 @@ impl Connector { /// Enables the connector's current logger to be swapped out for another pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger); + std::mem::swap(&mut self.unphased.logger, &mut new_logger); new_logger } /// Access the connector's current logger pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.inner.logger + &mut *self.unphased.logger } /// Create a new synchronous channel, returning its ends as a pair of ports, @@ -464,27 +508,27 @@ impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.inner.current_state.id_manager.new_port_id(); + let mut new_cid = || cu.current_state.id_manager.new_port_id(); let [o, i] = [new_cid(), new_cid()]; - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( o, PortInfo { route: Route::LocalComponent, peer: Some(i), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity: Putter, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( i, PortInfo { route: Route::LocalComponent, peer: Some(o), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity: Getter, }, ); - log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } @@ -515,8 +559,8 @@ impl Connector { return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.inner.native_component_id { + let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + if info.owner != cu.native_component_id { return Err(Ace::UnknownPort(port)); } if info.polarity != expected_polarity { @@ -524,12 +568,12 @@ impl Connector { } } // 2. add new component - let new_cid = cu.inner.current_state.id_manager.new_component_id(); + let new_cid = cu.current_state.id_manager.new_component_id(); cu.proto_components .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); // 3. update port ownership for port in ports.iter() { - match cu.inner.current_state.port_info.get_mut(port) { + match cu.current_state.port_info.get_mut(port) { Some(port_info) => port_info.owner = new_cid, None => unreachable!(), }