diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 34f8bcd79070695e834f42aa20bc2268c07a6b7f..3b67516160fab59fc5658dcd6e9501127e0e4005 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -51,7 +51,7 @@ pub struct FileLogger(ConnectorId, std::fs::File); // Interface between protocol state and the connector runtime BEFORE all components // ave begun their branching speculation. See ComponentState::nonsync_run. pub(crate) struct NonsyncProtoContext<'a> { - current_state: &'a mut CurrentState, + ips: &'a mut IdAndPortState, logger: &'a mut dyn Logger, unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase proto_component_id: ComponentId, // KEY in id->component map @@ -116,22 +116,22 @@ enum Route { UdpEndpoint { index: usize }, } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct MyPortInfo { - polarity: Polarity, - port: PortId, - owner: ComponentId, -} +// The outcome of a synchronous round, representing the distributed consensus. +// In the success case, the attached predicate encodes a row in the session's trace table. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] enum Decision { - Failure, + Failure, // some connector timed out! Success(Predicate), } + +// The type of control messages exchanged between connectors over the network #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } + +// Control messages exchanged during the setup phase only #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum SetupMsg { MyPortInfo(MyPortInfo), @@ -141,6 +141,9 @@ enum SetupMsg { SessionGather { unoptimized_map: HashMap }, SessionScatter { optimized_map: HashMap }, } + +// A data structure encoding the state of a connector, passed around +// during the session optimization procedure. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SessionInfo { serde_proto_description: SerdeProtocolDescription, @@ -148,28 +151,44 @@ struct SessionInfo { endpoint_incoming_to_getter: Vec, proto_components: HashMap, } + +// Newtype wrapper for an Arc, +// such that it can be (de)serialized for transmission over the network. #[derive(Debug, Clone)] struct SerdeProtocolDescription(Arc); + +// Control message particular to the communication phase. +// as such, it's annotated with a round_index #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct CommMsg { round_index: usize, contents: CommMsgContents, } + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommMsgContents { SendPayload(SendPayloadMsg), CommCtrl(CommCtrlMsg), } + +// Connector <-> connector control messages for use in the communication phase #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommCtrlMsg { - Suggest { suggestion: Decision }, // SINKWARD - Announce { decision: Decision }, // SINKAWAYS + Suggest { suggestion: Decision }, // child->parent + Announce { decision: Decision }, // parent->child } + +// Speculative payload message, communicating the value for the given +// port's message predecated on the given speculative variable assignments. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SendPayloadMsg { predicate: Predicate, payload: Payload, } + +// Return result of `Predicate::assignment_union`, communicating the contents +// of the predicate which represents the (consistent) union of their mappings, +// if it exists (no variable mapped distinctly by the input predicates) #[derive(Debug, PartialEq)] enum AssignmentUnionResult { FormerNotLatter, @@ -178,10 +197,16 @@ enum AssignmentUnionResult { New(Predicate), Nonexistant, } + +// One of two endpoints for a control channel with a connector on either end. +// The underlying transport is TCP, so we use an inbox buffer to allow +// discrete payload receipt. struct NetEndpoint { inbox: Vec, stream: TcpStream, } + +// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP #[derive(Debug, Clone)] struct NetEndpointSetup { getter_for_incoming: PortId, @@ -189,17 +214,29 @@ struct NetEndpointSetup { endpoint_polarity: EndpointPolarity, } +// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP #[derive(Debug, Clone)] struct UdpEndpointSetup { getter_for_incoming: PortId, local_addr: SocketAddr, peer_addr: SocketAddr, } + +// NetEndpoint annotated with the ID of the port that receives payload +// messages received through the endpoint. This approach assumes that NetEndpoints +// DO NOT multiplex port->port channels, and so a mapping such as this is possible. +// As a result, the messages themselves don't need to carry the PortID with them. #[derive(Debug)] struct NetEndpointExt { net_endpoint: NetEndpoint, getter_for_incoming: PortId, } + +// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" +// described in the literature. +// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), +// It acts as a native component by managing a (speculative) set of payload messages (an outbox, +// protecting the peer on the other side of the network). #[derive(Debug)] struct UdpEndpointExt { sock: UdpSocket, // already bound and connected @@ -207,43 +244,59 @@ struct UdpEndpointExt { outgoing_payloads: HashMap, getter_for_incoming: PortId, } + +// Meta-data for the connector: its role in the consensus tree. #[derive(Debug)] struct Neighborhood { parent: Option, children: VecSet, } + +// Manages the connector's ID, and manages allocations for connector/port IDs. #[derive(Debug, Clone)] struct IdManager { connector_id: ConnectorId, port_suffix_stream: U32Stream, component_suffix_stream: U32Stream, } + +// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. struct UdpInBuffer { byte_vec: Vec, } + +// A generator of speculative variables. Created on-demand during the synchronous round +// by the IdManager. #[derive(Debug)] struct SpecVarStream { connector_id: ConnectorId, port_suffix_stream: U32Stream, } + +// Manages the messy state of the various endpoints, pollers, buffers, etc. #[derive(Debug)] struct EndpointManager { // invariants: - // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into + // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into // 2. Events is empty poll: Poll, events: Events, delayed_messages: Vec<(usize, Msg)>, - undelayed_messages: Vec<(usize, Msg)>, + undelayed_messages: Vec<(usize, Msg)>, // ready to yield net_endpoint_store: EndpointStore, udp_endpoint_store: EndpointStore, udp_in_buffer: UdpInBuffer, } + +// A storage of endpoints, which keeps track of which components have raised +// an event during poll(), signifying that they need to be checked for new incoming data #[derive(Debug)] struct EndpointStore { endpoint_exts: Vec, polled_undrained: VecSet, } + +// The information associated with a port identifier, designed for local storage. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct PortInfo { owner: ComponentId, @@ -252,8 +305,19 @@ struct PortInfo { route: Route, } +// Similar to `PortInfo`, but designed for communication during the setup procedure. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct MyPortInfo { + polarity: Polarity, + port: PortId, + owner: ComponentId, +} + +// A convenient substructure for containing port info and the ID manager. +// Houses the bulk of the connector's persistent state between rounds. +// It turns out several situations require access to both things. #[derive(Debug, Clone)] -struct CurrentState { +struct IdAndPortState { port_info: HashMap, id_manager: IdManager, } @@ -267,12 +331,14 @@ struct ConnectorCommunication { native_batches: Vec, round_result: Result, SyncError>, } + +// A component's data common to both setup and communication phases #[derive(Debug)] struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, logger: Box, - current_state: CurrentState, + ips: IdAndPortState, native_component_id: ComponentId, } @@ -290,6 +356,8 @@ struct ConnectorSetup { udp_endpoint_setups: Vec, } +// A newtype wrapper for a map from speculative variable to speculative value +// A missing mapping corresponds with "unspecified". #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] struct Predicate { assigned: BTreeMap, @@ -333,7 +401,7 @@ struct RoundCtx { spec_var_stream: SpecVarStream, payload_inbox: Vec<(PortId, SendPayloadMsg)>, deadline: Option, - current_state: CurrentState, + ips: IdAndPortState, } // A trait intended to limit the access of the ConnectorUnphased structure @@ -382,6 +450,7 @@ fn err_would_block(err: &std::io::Error) -> bool { } impl VecSet { fn new(mut vec: Vec) -> Self { + // establish the invariant vec.sort(); vec.dedup(); Self { vec } @@ -389,6 +458,7 @@ impl VecSet { fn contains(&self, element: &T) -> bool { self.vec.binary_search(element).is_ok() } + // Insert the given element. Returns whether it was already present. fn insert(&mut self, element: T) -> bool { match self.vec.binary_search(&element) { Ok(_) => false, @@ -405,7 +475,7 @@ impl VecSet { self.vec.pop() } } -impl CurrentState { +impl IdAndPortState { fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { self.port_info .iter() @@ -413,6 +483,9 @@ impl CurrentState { .map(|(port, _)| port) } fn spec_var_for(&self, port: PortId) -> SpecVar { + // Every port maps to a speculative variable + // Two distinct ports map to the same variable + // IFF they are two ends of the same logical channel. let info = self.port_info.get(&port).unwrap(); SpecVar(match info.polarity { Getter => port, @@ -458,7 +531,7 @@ impl Drop for Connector { log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!"); } } - +// Given a slice of ports, return the first, if any, port is present repeatedly fn duplicate_port(slice: &[PortId]) -> Option { let mut vec = Vec::with_capacity(slice.len()); for port in slice.iter() { @@ -514,9 +587,14 @@ impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.current_state.id_manager.new_port_id(); + let mut new_cid = || cu.ips.id_manager.new_port_id(); + // allocate two fresh port identifiers let [o, i] = [new_cid(), new_cid()]; - cu.current_state.port_info.insert( + // store info for each: + // - they are each others' peers + // - they are owned by a local component with id `cid` + // - polarity putter, getter respectively + cu.ips.port_info.insert( o, PortInfo { route: Route::LocalComponent, @@ -525,7 +603,7 @@ impl Connector { polarity: Putter, }, ); - cu.current_state.port_info.insert( + cu.ips.port_info.insert( i, PortInfo { route: Route::LocalComponent, @@ -553,19 +631,18 @@ impl Connector { identifier: &[u8], ports: &[PortId], ) -> Result<(), AddComponentError> { - // called by the USER. moves ports owned by the NATIVE + // Check for error cases first before modifying `cu` use AddComponentError as Ace; - // 1. check if this is OK + let cu = &self.unphased; if let Some(port) = duplicate_port(ports) { return Err(Ace::DuplicatePort(port)); } - let cu = &mut self.unphased; let expected_polarities = cu.proto_description.component_polarities(identifier)?; if expected_polarities.len() != ports.len() { return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; if info.owner != cu.native_component_id { return Err(Ace::UnknownPort(port)); } @@ -573,13 +650,15 @@ impl Connector { return Err(Ace::WrongPortPolarity { port, expected_polarity }); } } - // 2. add new component - let new_cid = cu.current_state.id_manager.new_component_id(); + // No errors! Time to modify `cu` + // create a new component and identifier + let cu = &mut self.unphased; + let new_cid = cu.ips.id_manager.new_component_id(); cu.proto_components .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); - // 3. update port ownership + // update the ownership of moved ports for port in ports.iter() { - match cu.current_state.port_info.get_mut(port) { + match cu.ips.port_info.get_mut(port) { Some(port_info) => port_info.owner = new_cid, None => unreachable!(), } @@ -598,6 +677,7 @@ impl Predicate { self } + // Return true whether `self` is a subset of `maybe_superset` pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { for (var, val) in self.assigned.iter() { match maybe_superset.assigned.get(var) { @@ -605,41 +685,35 @@ impl Predicate { _ => return false, // var unmapped, or mapped differently } } + // `maybe_superset` mirrored all my assignments! true } - // returns true IFF self.unify would return Equivalent OR FormerNotLatter - // pub fn consistent_with(&self, other: &Self) -> bool { - // let [larger, smaller] = - // if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] }; - - // for (var, val) in smaller.assigned.iter() { - // match larger.assigned.get(var) { - // Some(val2) if val2 != val => return false, - // _ => {} - // } - // } - // true - // } - - /// Given self and other, two predicates, return the predicate whose - /// assignments are the union of those of self and other. + /// Given the two predicates {self, other}, return that whose + /// assignments are the union of those of both. fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { use AssignmentUnionResult as Aur; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; - // lists of assignments in self but not other and vice versa. + // populate lists of assignments in self but not other and vice versa. + // do this by incrementally unfolding the iterators, keeping an eye + // on the ordering between the head elements [s, o]. + // whenever s break, + [None, None] => break, // both iterators are empty [None, Some(x)] => { + // self's iterator is empty. + // all remaning elements are in other but not self o_not_s.push(x); o_not_s.extend(o_it); break; } [Some(x), None] => { + // other's iterator is empty. + // all remaning elements are in self but not other s_not_o.push(x); s_not_o.extend(s_it); break; @@ -656,6 +730,7 @@ impl Predicate { } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value + // No predicate exists which satisfies both! return Aur::Nonexistant; } else { // both predicates assign the variable to the same value @@ -681,6 +756,9 @@ impl Predicate { } } } + + // Compute the union of the assignments of the two given predicates, if it exists. + // It doesn't exist if there is some value which the predicates assign to different values. pub(crate) fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { @@ -695,6 +773,30 @@ impl Predicate { self.assigned.get(&var).copied() } } + +impl RoundCtx { + // remove an arbitrary buffered message, along with the ID of the getter who receives it + fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { + self.payload_inbox.pop() + } + + // buffer a message along with the ID of the getter who receives it + fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.payload_inbox.push((getter, msg)); + } + + // buffer a message along with the ID of the putter who sent it + fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { + if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer { + log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); + self.getter_push(getter, msg); + } else { + log!(cu.logger(), "Putter {:?} has no known peer!", putter); + panic!("Putter {:?} has no known peer!"); + } + } +} + impl Debug for VecSet { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_set().entries(self.vec.iter()).finish() @@ -778,21 +880,3 @@ impl Debug for UdpInBuffer { write!(f, "UdpInBuffer") } } - -impl RoundCtx { - fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.payload_inbox.pop() - } - fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.payload_inbox.push((getter, msg)); - } - fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.current_state.port_info.get(&putter).unwrap().peer { - log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_push(getter, msg); - } else { - log!(cu.logger(), "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!"); - } - } -}