diff --git a/README.md b/README.md index f1af66eb79cd31422574eea105daded09be271a6..04902469c056ade24337c53067486e269093d7db 100644 --- a/README.md +++ b/README.md @@ -18,4 +18,16 @@ This library provides connectors as a generalization of sockets for use in commu The `examples` directory contains example usages of connectors for message passing over the internet. The programs within require that the library is compiled as a dylib (see above). ## Notes -3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. (WIP) This is only necessary to update it. \ No newline at end of file +3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. This is only necessary to update it. + +## Short User Overview +The bulk of the library's functionality is exposed to the user in two types: +1. `protocol::ProtocolDescription` +1. `runtime::Connector` + +The former is created using `parse`. For the most part, the user is not expected to interact much with the structure, only passing it to the connector as a communication session is being set up. + +The latter is created with `new`, configured with methods such as `new_net_port` and `add_component`, and connected via `connect`, whereafter it can be used for multi-party communication through methods `put`, `get`, `next_batch`, and `sync`. + +## Contributor Overview +The details of the implementation are best understood by reading the doc comments, starting from the procedures listed in the section above. It is suggested to first/also refer to the Reowolf project's companion documentation (link TODO) for a higher level overview of the goals and design of the implementation. \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index dbdbdc02169b6deb0cd7dff6225728b2affb08f1..833795dcff0ac6c1f792559c21309d069bcf224e 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,8 +1,6 @@ ///////////////////// PRELUDE ///////////////////// - pub(crate) use crate::protocol::{ComponentState, ProtocolDescription}; pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext}; - pub(crate) use core::{ cmp::Ordering, fmt::{Debug, Formatter}, @@ -10,7 +8,6 @@ pub(crate) use core::{ ops::Range, time::Duration, }; -// pub(crate) use indexmap::IndexSet; pub(crate) use maplit::hashmap; pub(crate) use mio::{ net::{TcpListener, TcpStream}, @@ -29,16 +26,17 @@ pub(crate) use Polarity::*; pub(crate) trait IdParts { fn id_parts(self) -> (ConnectorId, U32Suffix); } + +/// Used by various distributed algorithms to identify connectors. pub type ConnectorId = u32; + +/// Used in conjunction with the `ConnectorId` type to create identifiers for ports and components pub type U32Suffix = u32; #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] -// pub, because it can be acquired via error in the Rust API -pub struct ComponentId(Id); -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] + +/// Generalization of a port/component identifier #[repr(C)] pub struct Id { pub(crate) connector_id: ConnectorId, @@ -48,16 +46,28 @@ pub struct Id { pub struct U32Stream { next: u32, } + +/// Identifier of a component in a session +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub struct ComponentId(Id); // PUB because it can be returned by errors + +/// Identifier of a port in a session #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] #[repr(transparent)] pub struct PortId(Id); + +/// A safely aliasable heap-allocated payload of message bytes #[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)] pub struct Payload(Arc>); #[derive( Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, )] + +/// "Orientation" of a port, determining whether they can send or receive messages with `put` and `get` respectively. #[repr(C)] pub enum Polarity { Putter, // output port (from the perspective of the component) @@ -66,11 +76,15 @@ pub enum Polarity { #[derive( Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, )] + +/// "Orientation" of a transport-layer network endpoint, dictating how it's connection procedure should +/// be conducted. Corresponds with connect() / accept() familiar to TCP socket programming. #[repr(C)] pub enum EndpointPolarity { Active, // calls connect() Passive, // calls bind() listen() accept() } + #[derive(Debug, Clone)] pub(crate) enum NonsyncBlocker { Inconsistent, @@ -133,6 +147,7 @@ impl From<&[u8]> for Payload { } } impl Payload { + /// Create a new payload of uninitialized bytes with the given length. pub fn new(len: usize) -> Payload { let mut v = Vec::with_capacity(len); unsafe { @@ -140,18 +155,26 @@ impl Payload { } Payload(Arc::new(v)) } + /// Returns the length of the payload's byte sequence pub fn len(&self) -> usize { self.0.len() } + /// Allows shared reading of the payload's contents pub fn as_slice(&self) -> &[u8] { &self.0 } - pub fn as_mut_slice(&mut self) -> &mut [u8] { - Arc::make_mut(&mut self.0) as _ + + /// Allows mutation of the payload's contents. + /// Results in a deep copy in the event this payload is aliased. + pub fn as_mut_vec(&mut self) -> &mut Vec { + Arc::make_mut(&mut self.0) } + + /// Modifies this payload, concatenating the given immutable payload's contents. + /// Results in a deep copy in the event this payload is aliased. pub fn concatenate_with(&mut self, other: &Self) { let bytes = other.as_slice().iter().copied(); - let me = Arc::make_mut(&mut self.0); + let me = self.as_mut_vec(); me.extend(bytes); } } diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 4e77eec893d793d355ee1b192726e82ebbbe8577..b89a1a442a3ae9963dfdf345b2fcb0ef887a9a5b 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -113,7 +113,7 @@ impl Value { // It is inconsistent to update with a negative value return None; } - if let Some(slot) = payload.as_mut_slice().get_mut(the_index) { + if let Some(slot) = payload.as_mut_vec().get_mut(the_index) { *slot = (*b).try_into().unwrap(); Some(value.clone()) } else { @@ -126,7 +126,7 @@ impl Value { // It is inconsistent to update with a negative value or a too large value return None; } - if let Some(slot) = payload.as_mut_slice().get_mut(the_index) { + if let Some(slot) = payload.as_mut_vec().get_mut(the_index) { *slot = (*b).try_into().unwrap(); Some(value.clone()) } else { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 953151bdc93532b35e4ca164763cd350ecd96ba7..b4342f823c424d21082fcd76c5d1cbfa7a5eb750 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -95,28 +95,31 @@ impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> { } } impl Connector { - fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> { - if let ConnectorPhased::Communication(comm) = &mut self.phased { - Some(comm) - } else { - None - } - } - pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { + /// Read the message received by the given port in the previous synchronous round. + pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; - let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?; - match &comm.round_result { - Err(_) => Err(Ge::PreviousSyncFailed), - Ok(None) => Err(Ge::NoPreviousRound), - Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), + if let ConnectorPhased::Communication(comm) = &self.phased { + match &comm.round_result { + Err(_) => Err(Ge::PreviousSyncFailed), + Ok(None) => Err(Ge::NoPreviousRound), + Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), + } + } else { + return Err(Ge::NoPreviousRound); } } + /// Creates a new, empty synchronous batch for the connector and selects it. + /// Subsequent calls to `put` and `get` with populate the new batch with port operations. pub fn next_batch(&mut self) -> Result { // returns index of new batch - let comm = self.get_comm_mut().ok_or(WrongStateError)?; - comm.native_batches.push(Default::default()); - Ok(comm.native_batches.len() - 1) + if let ConnectorPhased::Communication(comm) = &mut self.phased { + comm.native_batches.push(Default::default()); + Ok(comm.native_batches.len() - 1) + } else { + Err(WrongStateError) + } } + fn port_op_access( &mut self, port: PortId, @@ -139,6 +142,10 @@ impl Connector { } } } + + /// Add a `put` operation to the connector's currently-selected synchronous batch. + /// Returns an error if the given port is not owned by the native component, + /// has the wrong polarity, or is already included in the batch. pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Putter)?; @@ -149,6 +156,10 @@ impl Connector { Ok(()) } } + + /// Add a `get` operation to the connector's currently-selected synchronous batch. + /// Returns an error if the given port is not owned by the native component, + /// has the wrong polarity, or is already included in the batch. pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Getter)?; @@ -158,7 +169,21 @@ impl Connector { Err(Poe::MultipleOpsOnPort) } } - // entrypoint for caller. overwrites round result enum, and returns what happened + + /// Participate in the completion of the next synchronous round, in which + /// the native component will perform the set of prepared operations of exactly one + /// of the synchronous batches. At the end of the procedure, the synchronous + /// batches will be reset to a singleton set, whose only element is selected, and empty. + /// The caller yields control over to the connector runtime to faciltiate the underlying + /// coordination work until either (a) the round is completed with all components' states + /// updated accordingly, (b) a distributed failure event resets all components' + /// states to what they were prior to the sync call, or (c) the sync procedure encounters + /// an unrecoverable error which ends the call early, and breaks the session and connector's + /// states irreversably. + /// Note that the (b) case necessitates the success of a distributed rollback procedure, + /// which this component may initiate, but cannot guarantee will succeed in time or at all. + /// consequently, the given timeout duration represents a duration in which the connector + /// will make a best effort to fail the round and return control flow to the caller. pub fn sync(&mut self, timeout: Option) -> Result { let Self { unphased: cu, phased } = self; match phased { @@ -1068,28 +1093,6 @@ impl SolutionStorage { new_local: Default::default(), } } - // fn is_clear(&self) -> bool { - // self.subtree_id_to_index.is_empty() - // && self.subtree_solutions.is_empty() - // && self.old_local.is_empty() - // && self.new_local.is_empty() - // } - // fn clear(&mut self) { - // self.subtree_id_to_index.clear(); - // self.subtree_solutions.clear(); - // self.old_local.clear(); - // self.new_local.clear(); - // } - // fn reset(&mut self, subtree_ids: impl Iterator) { - // self.subtree_id_to_index.clear(); - // self.subtree_solutions.clear(); - // self.old_local.clear(); - // self.new_local.clear(); - // for key in subtree_ids { - // self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); - // self.subtree_solutions.push(Default::default()) - // } - // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -1168,7 +1171,7 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { } } impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + pub(crate) fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports // sanity check @@ -1195,7 +1198,7 @@ impl NonsyncProtoContext<'_> { } // 3. create a new component } - pub fn new_port_pair(&mut self) -> [PortId; 2] { + pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component let mut new_cid_fn = || self.current_state.id_manager.new_port_id(); let [o, i] = [new_cid_fn(), new_cid_fn()]; diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 231fa8a7e0d0794c4cac69f1bb886e6d478407e9..409d56d590d11d1e3fa2256275f917b7b72f6204 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -15,6 +15,7 @@ pub enum ConnectError { } #[derive(Eq, PartialEq, Copy, Clone, Debug)] pub enum AddComponentError { + DuplicatePort(PortId), NoSuchComponent, NonPortTypeParameters, CannotMovePort(PortId), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index d641d186c085938d0413976b1afe4bd1114ee7c7..cd6e88d0692245b159b8d32ab094431f125a0a79 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -15,18 +15,36 @@ use crate::common::*; use error::*; use mio::net::UdpSocket; +/// Each Connector structure is the interface between the user's application and a communication session, +/// in which the application plays the part of a (native) component. This structure provides the application +/// with functionality available to all components: the ability to add new channels (port pairs), and to +/// instantiate new components whose definitions are defined in the connector's configured protocol +/// description. Native components have the additional ability to add `dangling' ports backed by local/remote +/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. +/// This allows sets of applications to cooperate in constructing shared sessions that span the network. #[derive(Debug)] pub struct Connector { unphased: ConnectorUnphased, phased: ConnectorPhased, } + +/// Characterizes a type which can write lines of logging text. +/// The implementations provided in the `logging` module are likely to be sufficient, +/// but for added flexibility, users are able to implement their own loggers for use +/// by connectors. pub trait Logger: Debug + Send + Sync { fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; } + +/// A logger that appends the logged strings to a growing byte buffer #[derive(Debug)] pub struct VecLogger(ConnectorId, Vec); + +/// A trivial logger that always returns None, such that no logging information is ever written. #[derive(Debug)] pub struct DummyLogger; + +/// A logger that writes the logged lines to a given file. #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); #[derive(Debug, Clone)] @@ -390,7 +408,32 @@ impl Drop for Connector { log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!"); } } + +fn duplicate_port(slice: &[PortId]) -> Option { + let mut vec = Vec::with_capacity(slice.len()); + for port in slice.iter() { + match vec.binary_search(port) { + Err(index) => vec.insert(index, *port), + Ok(_) => return Some(*port), + } + } + None +} impl Connector { + /// Generate a random connector identifier from the system's source of randomness. + pub fn random_id() -> ConnectorId { + type Bytes8 = [u8; std::mem::size_of::()]; + unsafe { + let mut bytes = std::mem::MaybeUninit::::uninit(); + // getrandom is the canonical crate for a small, secure rng + getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); + // safe! representations of all valid Byte8 values are valid ConnectorId values + std::mem::transmute::<_, _>(bytes.assume_init()) + } + } + + /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, + /// and it is ready to participate in synchronous rounds of communication. pub fn is_connected(&self) -> bool { // If designed for Rust usage, connectors would be exposed as an enum type from the start. // consequently, this "phased" business would also include connector variants and this would @@ -402,23 +445,22 @@ impl Connector { ConnectorPhased::Communication(..) => true, } } - pub(crate) fn random_id() -> ConnectorId { - type Bytes8 = [u8; std::mem::size_of::()]; - unsafe { - let mut bytes = std::mem::MaybeUninit::::uninit(); - // getrandom is the canonical crate for a small, secure rng - getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); - // safe! representations of all valid Byte8 values are valid ConnectorId values - std::mem::transmute::<_, _>(bytes.assume_init()) - } - } + + /// Enables the connector's current logger to be swapped out for another pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger); new_logger } + + /// Access the connector's current logger pub fn get_logger(&mut self) -> &mut dyn Logger { &mut *self.unphased.inner.logger } + + /// Create a new synchronous channel, returning its ends as a pair of ports, + /// with polarity output, input respectively. Available during either setup/communication phase. + /// # Panics + /// This function panics if the connector's (large) port id space is exhausted. pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native @@ -445,6 +487,17 @@ impl Connector { log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } + + /// Instantiates a new component for the connector runtime to manage, and passing + /// the given set of ports from the interface of the native component, to that of the + /// newly created component (passing their ownership). + /// # Errors + /// Error is returned if the moved ports are not owned by the native component, + /// if the given component name is not defined in the connector's protocol, + /// the given sequence of ports contains a duplicate port, + /// or if the component is unfit for instantiation with the given port sequence. + /// # Panics + /// This function panics if the connector's (large) component id space is exhausted. pub fn add_component( &mut self, identifier: &[u8], @@ -453,6 +506,9 @@ impl Connector { // called by the USER. moves ports owned by the NATIVE use AddComponentError as Ace; // 1. check if this is OK + if let Some(port) = duplicate_port(ports) { + return Err(Ace::DuplicatePort(port)); + } let cu = &mut self.unphased; let expected_polarities = cu.proto_description.component_polarities(identifier)?; if expected_polarities.len() != ports.len() { @@ -575,7 +631,7 @@ impl Predicate { } } } - pub fn union_with(&self, other: &Self) -> Option { + pub(crate) fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { match res.assigned.insert(channel_id, assignment_1) { @@ -585,7 +641,7 @@ impl Predicate { } Some(res) } - pub fn query(&self, var: SpecVar) -> Option { + pub(crate) fn query(&self, var: SpecVar) -> Option { self.assigned.get(&var).copied() } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index c87129d90ebd6abd5ff12099db7087bad6580003..27be90d80ca6bcc5d220bad3cf0b79947c432449 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -2,6 +2,17 @@ use crate::common::*; use crate::runtime::*; impl Connector { + /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing). + /// The resulting connector will start in the setup phase, and cannot be used for communication until the + /// `connect` procedure completes. + /// # Safety + /// The correctness of the system's underlying distributed algorithms requires that no two + /// connectors have the same ID. If the user does not know the identifiers of other connectors in the + /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error). + /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed + /// to preserve their configured protocols. + /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an + /// error during `connect`, observed as a peer misbehaving. pub fn new( mut logger: Box, proto_description: Arc, @@ -90,6 +101,10 @@ impl Connector { } } } + + /// Adds a "dangling" port to the connector in the setup phase, + /// to be formed into channel during the connect procedure with the given + /// transport layer information. pub fn new_net_port( &mut self, polarity: Polarity, @@ -127,6 +142,15 @@ impl Connector { } } } + + /// Finalizes the connector's setup procedure and forms a distributed system with + /// all other connectors reachable through network channels. This procedure represents + /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports, + /// but is ready to begin the first communication round. + /// Initially, the connector has a singleton set of _batches_, the only element of which is empty. + /// This single element starts off selected. The selected batch is modified with `put` and `get`, + /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the + /// purpose of these batches. pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError as Ce; let Self { unphased: cu, phased } = self; diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index b91631a903cee9c397d94f8a981206a56d86070a..c6075cd5eede3701425e3f722a6a229fbef1eada 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1191,7 +1191,7 @@ fn xrouter_comp() { while(true) synchronous { if(fires(a)) { msg m = get(a); - if(fires(b)) put(b, get(a)); + if(fires(b)) put(b, m); } } } @@ -1214,12 +1214,12 @@ fn xrouter_comp() { channel r -> s; channel t -> u; - new replicator(a, d, f); // ok - new replicator(g, t, h); // ok - new lossy(e, l); // ok - new lossy(i, j); // ok - new replicator(m, b, p); // ok - new replicator(k, n, c); // ok + new replicator(a, d, f); + new replicator(g, t, h); + new lossy(e, l); + new lossy(i, j); + new replicator(m, b, p); + new replicator(k, n, c); new merger(q, o, r); new sync_drain(u, s); }