diff --git a/src/runtime_old/mod.rs b/src/runtime_old/mod.rs deleted file mode 100644 index 95bc3e85c6f4df6c9a221764f5eb8ad46745159e..0000000000000000000000000000000000000000 --- a/src/runtime_old/mod.rs +++ /dev/null @@ -1,903 +0,0 @@ -/// cbindgen:ignore -mod communication; -/// cbindgen:ignore -mod endpoints; -pub mod error; -/// cbindgen:ignore -mod logging; -/// cbindgen:ignore -mod setup; - -#[cfg(test)] -mod tests; - -use crate::common::*; -use error::*; -use mio::net::UdpSocket; - -/// The interface between the user's application and a communication session, -/// in which the application plays the part of a (native) component. This structure provides the application -/// with functionality available to all components: the ability to add new channels (port pairs), and to -/// instantiate new components whose definitions are defined in the connector's configured protocol -/// description. Native components have the additional ability to add `dangling' ports backed by local/remote -/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. -/// This allows sets of applications to cooperate in constructing shared sessions that span the network. -#[derive(Debug)] -pub struct Connector { - unphased: ConnectorUnphased, - phased: ConnectorPhased, -} - -/// Characterizes a type which can write lines of logging text. -/// The implementations provided in the `logging` module are likely to be sufficient, -/// but for added flexibility, users are able to implement their own loggers for use -/// by connectors. -pub trait Logger: Debug + Send + Sync { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; -} - -/// A logger that appends the logged strings to a growing byte buffer -#[derive(Debug)] -pub struct VecLogger(ConnectorId, Vec); - -/// A trivial logger that always returns None, such that no logging information is ever written. -#[derive(Debug)] -pub struct DummyLogger; - -/// A logger that writes the logged lines to a given file. -#[derive(Debug)] -pub struct FileLogger(ConnectorId, std::fs::File); - -// Interface between protocol state and the connector runtime BEFORE all components -// ave begun their branching speculation. See ComponentState::nonsync_run. -pub(crate) struct NonsyncProtoContext<'a> { - ips: &'a mut IdAndPortState, - logger: &'a mut dyn Logger, - unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase - proto_component_id: ComponentId, // KEY in id->component map -} - -// Interface between protocol state and the connector runtime AFTER all components -// have begun their branching speculation. See ComponentState::sync_run. -pub(crate) struct SyncProtoContext<'a> { - rctx: &'a RoundCtx, - branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch - predicate: &'a Predicate, // KEY in pred->branch map -} - -// The data coupled with a particular protocol component branch, but crucially omitting -// the `ComponentState` such that this may be passed by reference to the state with separate -// access control. -#[derive(Default, Debug, Clone)] -struct ProtoComponentBranchInner { - did_put_or_get: HashSet, - inbox: HashMap, -} - -// A speculative variable that lives for the duration of the synchronous round. -// Each is assigned a value in domain `SpecVal`. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVar(PortId); - -// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, -// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVal(u16); - -// Data associated with a successful synchronous round, retained afterwards such that the -// native component can freely reflect on how it went, reading the messages received at their -// inputs, and reflecting on which of their connector's synchronous batches succeeded. -#[derive(Debug)] -struct RoundEndedNative { - batch_index: usize, - gotten: HashMap, -} - -// Implementation of a set in terms of a vector (optimized for reading, not writing) -#[derive(Default)] -struct VecSet { - // invariant: ordered, deduplicated - vec: Vec, -} - -// Allows a connector to remember how to forward payloads towards the component that -// owns their destination port. `LocalComponent` corresponds with messages for components -// managed by the connector itself (hinting for it to look it up in a local structure), -// whereas the other variants direct the connector to forward the messages over the network. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum Route { - LocalComponent, - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} - -// The outcome of a synchronous round, representing the distributed consensus. -// In the success case, the attached predicate encodes a row in the session's trace table. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Decision { - Failure, // some connector timed out! - Success(Predicate), -} - -// The type of control messages exchanged between connectors over the network -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum Msg { - SetupMsg(SetupMsg), - CommMsg(CommMsg), -} - -// Control messages exchanged during the setup phase only -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum SetupMsg { - MyPortInfo(MyPortInfo), - LeaderWave { wave_leader: ConnectorId }, - LeaderAnnounce { tree_leader: ConnectorId }, - YouAreMyParent, -} - -// Control message particular to the communication phase. -// as such, it's annotated with a round_index -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct CommMsg { - round_index: usize, - contents: CommMsgContents, -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommMsgContents { - SendPayload(SendPayloadMsg), - CommCtrl(CommCtrlMsg), -} - -// Connector <-> connector control messages for use in the communication phase -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommCtrlMsg { - Suggest { suggestion: Decision }, // child->parent - Announce { decision: Decision }, // parent->child -} - -// Speculative payload message, communicating the value for the given -// port's message predecated on the given speculative variable assignments. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct SendPayloadMsg { - predicate: Predicate, - payload: Payload, -} - -// Return result of `Predicate::assignment_union`, communicating the contents -// of the predicate which represents the (consistent) union of their mappings, -// if it exists (no variable mapped distinctly by the input predicates) -#[derive(Debug, PartialEq)] -enum AssignmentUnionResult { - FormerNotLatter, - LatterNotFormer, - Equivalent, - New(Predicate), - Nonexistant, -} - -// One of two endpoints for a control channel with a connector on either end. -// The underlying transport is TCP, so we use an inbox buffer to allow -// discrete payload receipt. -struct NetEndpoint { - inbox: Vec, - stream: TcpStream, -} - -// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct NetEndpointSetup { - getter_for_incoming: PortId, - sock_addr: SocketAddr, - endpoint_polarity: EndpointPolarity, -} - -// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct UdpEndpointSetup { - getter_for_incoming: PortId, - local_addr: SocketAddr, - peer_addr: SocketAddr, -} - -// NetEndpoint annotated with the ID of the port that receives payload -// messages received through the endpoint. This approach assumes that NetEndpoints -// DO NOT multiplex port->port channels, and so a mapping such as this is possible. -// As a result, the messages themselves don't need to carry the PortID with them. -#[derive(Debug)] -struct NetEndpointExt { - net_endpoint: NetEndpoint, - getter_for_incoming: PortId, -} - -// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" -// described in the literature. -// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), -// It acts as a native component by managing a (speculative) set of payload messages (an outbox, -// protecting the peer on the other side of the network). -#[derive(Debug)] -struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_this_round: bool, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} - -// Meta-data for the connector: its role in the consensus tree. -#[derive(Debug)] -struct Neighborhood { - parent: Option, - children: VecSet, -} - -// Manages the connector's ID, and manages allocations for connector/port IDs. -#[derive(Debug, Clone)] -struct IdManager { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, - component_suffix_stream: U32Stream, -} - -// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. -struct IoByteBuffer { - byte_vec: Vec, -} - -// A generator of speculative variables. Created on-demand during the synchronous round -// by the IdManager. -#[derive(Debug)] -struct SpecVarStream { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, -} - -// Manages the messy state of the various endpoints, pollers, buffers, etc. -#[derive(Debug)] -struct EndpointManager { - // invariants: - // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into - // 2. Events is empty - poll: Poll, - events: Events, - delayed_messages: Vec<(usize, Msg)>, - undelayed_messages: Vec<(usize, Msg)>, // ready to yield - net_endpoint_store: EndpointStore, - udp_endpoint_store: EndpointStore, - io_byte_buffer: IoByteBuffer, -} - -// A storage of endpoints, which keeps track of which components have raised -// an event during poll(), signifying that they need to be checked for new incoming data -#[derive(Debug)] -struct EndpointStore { - endpoint_exts: Vec, - polled_undrained: VecSet, -} - -// The information associated with a port identifier, designed for local storage. -#[derive(Clone, Debug)] -struct PortInfo { - owner: ComponentId, - peer: Option, - polarity: Polarity, - route: Route, -} - -// Similar to `PortInfo`, but designed for communication during the setup procedure. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct MyPortInfo { - polarity: Polarity, - port: PortId, - owner: ComponentId, -} - -// Newtype around port info map, allowing the implementation of some -// useful methods -#[derive(Default, Debug, Clone)] -struct PortInfoMap { - // invariant: self.invariant_preserved() - // `owned` is redundant information, allowing for fast lookup - // of a component's owned ports (which occurs during the sync round a lot) - map: HashMap, - owned: HashMap>, -} - -// A convenient substructure for containing port info and the ID manager. -// Houses the bulk of the connector's persistent state between rounds. -// It turns out several situations require access to both things. -#[derive(Debug, Clone)] -struct IdAndPortState { - port_info: PortInfoMap, - id_manager: IdManager, -} - -// A component's setup-phase-specific data -#[derive(Debug)] -struct ConnectorCommunication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - native_batches: Vec, - round_result: Result, SyncError>, -} - -// A component's data common to both setup and communication phases -#[derive(Debug)] -struct ConnectorUnphased { - proto_description: Arc, - proto_components: HashMap, - logger: Box, - ips: IdAndPortState, - native_component_id: ComponentId, -} - -// A connector's phase-specific data -#[derive(Debug)] -enum ConnectorPhased { - Setup(Box), - Communication(Box), -} - -// A connector's setup-phase-specific data -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} - -// A newtype wrapper for a map from speculative variable to speculative value -// A missing mapping corresponds with "unspecified". -#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -struct Predicate { - assigned: BTreeMap, -} - -// Identifies a child of this connector in the _solution tree_. -// Each connector creates its own local solutions for the consensus procedure during `sync`, -// from the solutions of its children. Those children are either locally-managed components, -// (which are leaves in the solution tree), or other connectors reachable through the given -// network endpoint (which are internal nodes in the solution tree). -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} - -// An accumulation of the connector's knowledge of all (a) the local solutions its children -// in the solution tree have found, and (b) its own solutions derivable from those of its children. -// This structure starts off each round with an empty set, and accumulates solutions as they are found -// by local components, or received over the network in control messages. -// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to -// say that these sets GROW until the round is over, and all solutions are reset. -#[derive(Debug)] -struct SolutionStorage { - // invariant: old_local U new_local solutions are those that can be created from - // the UNION of one element from each set in `subtree_solution`. - // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. - old_local: HashSet, // already sent to this connector's parent OR decided - new_local: HashSet, // not yet sent to this connector's parent OR decided - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} - -// Stores the transient data of a synchronous round. -// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of -// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, -// and can be undone if the round fails. -struct RoundCtx { - solution_storage: SolutionStorage, - spec_var_stream: SpecVarStream, - payload_inbox: Vec<(PortId, SendPayloadMsg)>, - deadline: Option, - ips: IdAndPortState, -} - -// A trait intended to limit the access of the ConnectorUnphased structure -// such that we don't accidentally modify any important component/port data -// while the results of the round are undecided. Why? Any actions during Connector::sync -// are _speculative_ until the round is decided, and we need a safe way of rolling -// back any changes. -trait CuUndecided { - fn logger(&mut self) -> &mut dyn Logger; - fn proto_description(&self) -> &ProtocolDescription; - fn native_component_id(&self) -> ComponentId; - fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); - fn logger_and_protocol_components( - &mut self, - ) -> (&mut dyn Logger, &mut HashMap); -} - -// Represents a set of synchronous port operations that the native component -// has described as an "option" for completing during the synchronous rounds. -// Operations contained here succeed together or not at all. -// A native with N=2+ batches are expressing an N-way nondeterministic choice -#[derive(Debug, Default)] -struct NativeBatch { - // invariant: putters' and getters' polarities respected - to_put: HashMap, - to_get: HashSet, -} - -// Parallels a mio::Token type, but more clearly communicates -// the way it identifies the evented structre it corresponds to. -// See runtime/setup for methods converting between TokenTarget and mio::Token -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -enum TokenTarget { - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} - -// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, -// such that it can know when to continue polling, and when to block. -enum CommRecvOk { - TimeoutWithoutNew, - NewPayloadMsgs, - NewControlMsg { net_index: usize, msg: CommCtrlMsg }, -} -//////////////// -fn err_would_block(err: &std::io::Error) -> bool { - err.kind() == std::io::ErrorKind::WouldBlock -} -impl VecSet { - fn new(mut vec: Vec) -> Self { - // establish the invariant - vec.sort(); - vec.dedup(); - Self { vec } - } - fn contains(&self, element: &T) -> bool { - self.vec.binary_search(element).is_ok() - } - // Insert the given element. Returns whether it was already present. - fn insert(&mut self, element: T) -> bool { - match self.vec.binary_search(&element) { - Ok(_) => false, - Err(index) => { - self.vec.insert(index, element); - true - } - } - } - fn iter(&self) -> std::slice::Iter { - self.vec.iter() - } - fn pop(&mut self) -> Option { - self.vec.pop() - } -} -impl PortInfoMap { - fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { - self.owned.get(&owner).into_iter().flat_map(HashSet::iter) - } - fn spec_var_for(&self, port: PortId) -> SpecVar { - // Every port maps to a speculative variable - // Two distinct ports map to the same variable - // IFF they are two ends of the same logical channel. - let info = self.map.get(&port).unwrap(); - SpecVar(match info.polarity { - Getter => port, - Putter => info.peer.unwrap(), - }) - } - fn invariant_preserved(&self) -> bool { - // for every port P with some owner O, - // P is in O's owned set - for (port, info) in self.map.iter() { - match self.owned.get(&info.owner) { - Some(set) if set.contains(port) => {} - _ => { - println!("{:#?}\n WITH port {:?}", self, port); - return false; - } - } - } - // for every port P owned by every owner O, - // P's owner is O - for (&owner, set) in self.owned.iter() { - for port in set { - match self.map.get(port) { - Some(info) if info.owner == owner => {} - _ => { - println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port); - return false; - } - } - } - } - true - } -} -impl SpecVarStream { - fn next(&mut self) -> SpecVar { - let phantom_port: PortId = - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } - .into(); - SpecVar(phantom_port) - } -} -impl IdManager { - fn new(connector_id: ConnectorId) -> Self { - Self { - connector_id, - port_suffix_stream: Default::default(), - component_suffix_stream: Default::default(), - } - } - fn new_spec_var_stream(&self) -> SpecVarStream { - // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. - // This gap is entirely unnecessary (i.e. 0 is fine) - // It's purpose is only to make SpecVars easier to spot in logs. - // E.g. spot the spec var: { v0_0, v1_2, v1_103 } - const SKIP_N: u32 = 100; - let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); - SpecVarStream { connector_id: self.connector_id, port_suffix_stream } - } - fn new_port_id(&mut self) -> PortId { - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() - } - fn new_component_id(&mut self) -> ComponentId { - Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } - .into() - } -} -impl Drop for Connector { - fn drop(&mut self) { - log!(self.unphased.logger(), "Connector dropping. Goodbye!"); - } -} -// Given a slice of ports, return the first, if any, port is present repeatedly -fn duplicate_port(slice: &[PortId]) -> Option { - let mut vec = Vec::with_capacity(slice.len()); - for port in slice.iter() { - match vec.binary_search(port) { - Err(index) => vec.insert(index, *port), - Ok(_) => return Some(*port), - } - } - None -} -impl Connector { - /// Generate a random connector identifier from the system's source of randomness. - pub fn random_id() -> ConnectorId { - type Bytes8 = [u8; std::mem::size_of::()]; - unsafe { - let mut bytes = std::mem::MaybeUninit::::uninit(); - // getrandom is the canonical crate for a small, secure rng - getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); - // safe! representations of all valid Byte8 values are valid ConnectorId values - std::mem::transmute::<_, _>(bytes.assume_init()) - } - } - - /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, - /// and it is ready to participate in synchronous rounds of communication. - pub fn is_connected(&self) -> bool { - // If designed for Rust usage, connectors would be exposed as an enum type from the start. - // consequently, this "phased" business would also include connector variants and this would - // get a lot closer to the connector impl. itself. - // Instead, the C-oriented implementation doesn't distinguish connector states as types, - // and distinguish them as enum variants instead - match self.phased { - ConnectorPhased::Setup(..) => false, - ConnectorPhased::Communication(..) => true, - } - } - - /// Enables the connector's current logger to be swapped out for another - pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.logger, &mut new_logger); - new_logger - } - - /// Access the connector's current logger - pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.logger - } - - /// Create a new synchronous channel, returning its ends as a pair of ports, - /// with polarity output, input respectively. Available during either setup/communication phase. - /// # Panics - /// This function panics if the connector's (large) port id space is exhausted. - pub fn new_port_pair(&mut self) -> [PortId; 2] { - let cu = &mut self.unphased; - // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.ips.id_manager.new_port_id(); - // allocate two fresh port identifiers - let [o, i] = [new_cid(), new_cid()]; - // store info for each: - // - they are each others' peers - // - they are owned by a local component with id `cid` - // - polarity putter, getter respectively - cu.ips.port_info.map.insert( - o, - PortInfo { - route: Route::LocalComponent, - peer: Some(i), - owner: cu.native_component_id, - polarity: Putter, - }, - ); - cu.ips.port_info.map.insert( - i, - PortInfo { - route: Route::LocalComponent, - peer: Some(o), - owner: cu.native_component_id, - polarity: Getter, - }, - ); - cu.ips - .port_info - .owned - .entry(cu.native_component_id) - .or_default() - .extend([o, i].iter().copied()); - - log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } - - /// Instantiates a new component for the connector runtime to manage, and passing - /// the given set of ports from the interface of the native component, to that of the - /// newly created component (passing their ownership). - /// # Errors - /// Error is returned if the moved ports are not owned by the native component, - /// if the given component name is not defined in the connector's protocol, - /// the given sequence of ports contains a duplicate port, - /// or if the component is unfit for instantiation with the given port sequence. - /// # Panics - /// This function panics if the connector's (large) component id space is exhausted. - pub fn add_component( - &mut self, - module_name: &[u8], - identifier: &[u8], - ports: &[PortId], - ) -> Result<(), AddComponentError> { - // Check for error cases first before modifying `cu` - use AddComponentError as Ace; - let cu = &self.unphased; - if let Some(port) = duplicate_port(ports) { - return Err(Ace::DuplicatePort(port)); - } - let expected_polarities = cu.proto_description.component_polarities(module_name, identifier)?; - if expected_polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); - } - for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.native_component_id { - return Err(Ace::UnknownPort(port)); - } - if info.polarity != expected_polarity { - return Err(Ace::WrongPortPolarity { port, expected_polarity }); - } - } - // No errors! Time to modify `cu` - // create a new component and identifier - let Connector { phased, unphased: cu } = self; - let new_cid = cu.ips.id_manager.new_component_id(); - cu.proto_components.insert(new_cid, cu.proto_description.new_component(module_name, identifier, ports)); - // update the ownership of moved ports - for port in ports.iter() { - match cu.ips.port_info.map.get_mut(port) { - Some(port_info) => port_info.owner = new_cid, - None => unreachable!(), - } - } - if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { - set.retain(|x| !ports.contains(x)); - } - let moved_port_set: HashSet = ports.iter().copied().collect(); - if let ConnectorPhased::Communication(comm) = phased { - // Preserve invariant: batches only reason about native's ports. - // Remove batch puts/gets for moved ports. - for batch in comm.native_batches.iter_mut() { - batch.to_put.retain(|port, _| !moved_port_set.contains(port)); - batch.to_get.retain(|port| !moved_port_set.contains(port)); - } - } - cu.ips.port_info.owned.insert(new_cid, moved_port_set); - Ok(()) - } -} -impl Predicate { - #[inline] - pub fn singleton(k: SpecVar, v: SpecVal) -> Self { - Self::default().inserted(k, v) - } - #[inline] - pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { - self.assigned.insert(k, v); - self - } - - // Return true whether `self` is a subset of `maybe_superset` - pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { - for (var, val) in self.assigned.iter() { - match maybe_superset.assigned.get(var) { - Some(val2) if val2 == val => {} - _ => return false, // var unmapped, or mapped differently - } - } - // `maybe_superset` mirrored all my assignments! - true - } - - /// Given the two predicates {self, other}, return that whose - /// assignments are the union of those of both. - fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { - use AssignmentUnionResult as Aur; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // populate lists of assignments in self but not other and vice versa. - // do this by incrementally unfolding the iterators, keeping an eye - // on the ordering between the head elements [s, o]. - // whenever s break, // both iterators are empty - [None, Some(x)] => { - // self's iterator is empty. - // all remaning elements are in other but not self - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - // other's iterator is empty. - // all remaning elements are in self but not other - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - // No predicate exists which satisfies both! - return Aur::Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } - } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Aur::Equivalent, // ... equivalent to both. - [false, true] => Aur::FormerNotLatter, // ... equivalent to self. - [true, false] => Aur::LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); - } - Aur::New(new) - } - } - } - - // Compute the union of the assignments of the two given predicates, if it exists. - // It doesn't exist if there is some value which the predicates assign to different values. - pub(crate) fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} - } - } - Some(res) - } - pub(crate) fn query(&self, var: SpecVar) -> Option { - self.assigned.get(&var).copied() - } -} - -impl RoundCtx { - // remove an arbitrary buffered message, along with the ID of the getter who receives it - fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.payload_inbox.pop() - } - - // buffer a message along with the ID of the getter who receives it - fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.payload_inbox.push((getter, msg)); - } - - // buffer a message along with the ID of the putter who sent it - fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { - log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_push(getter, msg); - } else { - log!(cu.logger(), "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!", putter); - } - } -} - -impl Debug for VecSet { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_set().entries(self.vec.iter()).finish() - } -} -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); - impl Debug for Assignment<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{:?}={:?}", (self.0).0, (self.0).1) - } - } - f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() - } -} -impl IdParts for SpecVar { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl Debug for SpecVar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "v{}_{}", a, b) - } -} -impl SpecVal { - const FIRING: Self = SpecVal(1); - const SILENT: Self = SpecVal(0); - fn is_firing(self) -> bool { - self == Self::FIRING - // all else treated as SILENT - } - fn iter_domain() -> impl Iterator { - (0..).map(SpecVal) - } -} -impl Debug for SpecVal { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} -impl Default for IoByteBuffer { - fn default() -> Self { - let mut byte_vec = Vec::with_capacity(Self::CAPACITY); - unsafe { - // safe! this vector is guaranteed to have sufficient capacity - byte_vec.set_len(Self::CAPACITY); - } - Self { byte_vec } - } -} -impl IoByteBuffer { - const CAPACITY: usize = u16::MAX as usize + 1000; - fn as_mut_slice(&mut self) -> &mut [u8] { - self.byte_vec.as_mut_slice() - } -} - -impl Debug for IoByteBuffer { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "IoByteBuffer") - } -}