Changeset - d76b1fe2648f
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-09-22 15:43:19
christopher.esterhuyse@gmail.com
mild cleanup and major fleshing out of doc comments
8 files changed with 193 insertions and 74 deletions:
0 comments (0 inline, 0 general)
README.md
Show inline comments
 
# Reowolf 1.0 Implementation
 

	
 
This library provides connectors as a generalization of sockets for use in communication over the internet. This repository is one of the deliverables of the [Reowolf project](https://nlnet.nl/project/Reowolf/) funded by the NLNet foundation.
 

	
 
## Compilation instructions
 
1. Install the latest stable Rust toolchain (`rustup install stable`) using the [rustup](https://rustup.rs/) CLI tool, available for most platforms.
 
1. Have `cargo` (the Rust language package manager) download source dependencies, and compile the library with release-level optimizations with `cargo build --release`: 
 
	- The resulting dylib can be found in `./target/release/`, to be used with the header file: `./reowolf.h`.
 
	- *Note*: A list of immediate ancestor dependencies is visible in `./Cargo.toml`.
 
	- *Note*: Run `cargo test --release` to run unit tests with release-level optimizations.
 

	
 
## Using the library
 
- The library may be used as a Rust dependency by adding it as a git dependency, i.e., by adding line `reowolf_rs = { git = "https://scm.cwi.nl/FM/reowolf" }` to downstream crate's manifest file, `Cargo.toml`.
 
- The library may be used as a dynamically-linked library using its C ABI as the cdylib written to `./target/release` when compiled with release optimizations, in combination to the header file `./reowolf.h`.
 
- When compiled on Linux, the compiled library will include definitions of pseudo-socket procedures declared in `./pseudo-socket.h` when compiled with `cargo build --release --features ffi_pseudo_socket_api`. The added functionality is only needed when requiring that connectors expose a socket-like API.
 

	
 
## Examples
 
The `examples` directory contains example usages of connectors for message passing over the internet. The programs within require that the library is compiled as a dylib (see above).
 

	
 
## Notes
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. (WIP) This is only necessary to update it.  
 
\ No newline at end of file
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. This is only necessary to update it.  
 

	
 
## Short User Overview
 
The bulk of the library's functionality is exposed to the user in two types: 
 
1. `protocol::ProtocolDescription` 
 
1. `runtime::Connector` 
 

	
 
The former is created using `parse`. For the most part, the user is not expected to interact much with the structure, only passing it to the connector as a communication session is being set up.
 

	
 
The latter is created with `new`, configured with methods such as `new_net_port` and `add_component`, and connected via `connect`, whereafter it can be used for multi-party communication through methods `put`, `get`, `next_batch`, and `sync`.
 

	
 
## Contributor Overview
 
The details of the implementation are best understood by reading the doc comments, starting from the procedures listed in the section above. It is suggested to first/also refer to the Reowolf project's companion documentation (link TODO) for a higher level overview of the goals and design of the implementation.
 
\ No newline at end of file
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub(crate) use crate::protocol::{ComponentState, ProtocolDescription};
 
pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext};
 

	
 
pub(crate) use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::Hash,
 
    ops::Range,
 
    time::Duration,
 
};
 
// pub(crate) use indexmap::IndexSet;
 
pub(crate) use maplit::hashmap;
 
pub(crate) use mio::{
 
    net::{TcpListener, TcpStream},
 
    Events, Interest, Poll, Token,
 
};
 
pub(crate) use std::{
 
    collections::{BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub(crate) use Polarity::*;
 

	
 
pub(crate) trait IdParts {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix);
 
}
 

	
 
/// Used by various distributed algorithms to identify connectors.
 
pub type ConnectorId = u32;
 

	
 
/// Used in conjunction with the `ConnectorId` type to create identifiers for ports and components
 
pub type U32Suffix = u32;
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
// pub, because it can be acquired via error in the Rust API
 
pub struct ComponentId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// Generalization of a port/component identifier
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: U32Suffix,
 
}
 
#[derive(Clone, Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
/// Identifier of a component in a session
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ComponentId(Id); // PUB because it can be returned by errors
 

	
 
/// Identifier of a port in a session
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 

	
 
/// A safely aliasable heap-allocated payload of message bytes
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// "Orientation" of a port, determining whether they can send or receive messages with `put` and `get` respectively.
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// "Orientation" of a transport-layer network endpoint, dictating how it's connection procedure should
 
/// be conducted. Corresponds with connect() / accept() familiar to TCP socket programming.
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
 
@@ -112,67 +126,76 @@ impl U32Stream {
 
        self.next += 1;
 
        self.next - 1
 
    }
 
    pub(crate) fn n_skipped(mut self, n: u32) -> Self {
 
        self.next = self.next.saturating_add(n);
 
        self
 
    }
 
}
 
impl From<Id> for PortId {
 
    fn from(id: Id) -> PortId {
 
        Self(id)
 
    }
 
}
 
impl From<Id> for ComponentId {
 
    fn from(id: Id) -> Self {
 
        Self(id)
 
    }
 
}
 
impl From<&[u8]> for Payload {
 
    fn from(s: &[u8]) -> Payload {
 
        Payload(Arc::new(s.to_vec()))
 
    }
 
}
 
impl Payload {
 
    /// Create a new payload of uninitialized bytes with the given length.
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    /// Returns the length of the payload's byte sequence
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    /// Allows shared reading of the payload's contents
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 

	
 
    /// Allows mutation of the payload's contents.
 
    /// Results in a deep copy in the event this payload is aliased.
 
    pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
 
        Arc::make_mut(&mut self.0)
 
    }
 

	
 
    /// Modifies this payload, concatenating the given immutable payload's contents.
 
    /// Results in a deep copy in the event this payload is aliased.
 
    pub fn concatenate_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        let me = self.as_mut_vec();
 
        me.extend(bytes);
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &Vec<u8> = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
src/protocol/eval.rs
Show inline comments
 
@@ -92,62 +92,62 @@ impl Value {
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
 
                if index < 0 || index >= MESSAGE_MAX_LENGTH {
 
                    // It is inconsistent to update out of bounds
 
                    return None;
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        // And the value and the subject must be compatible
 
        match (self, value) {
 
            (Value::Message(MessageValue(None)), _) => {
 
                // It is inconsistent to update the null message
 
                None
 
            }
 
            (Value::Message(MessageValue(Some(payload))), Value::Byte(ByteValue(b))) => {
 
                if *b < 0 {
 
                    // It is inconsistent to update with a negative value
 
                    return None;
 
                }
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_vec().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            (Value::Message(MessageValue(Some(payload))), Value::Short(ShortValue(b))) => {
 
                if *b < 0 || *b > BYTE_MAX as i16 {
 
                    // It is inconsistent to update with a negative value or a too large value
 
                    return None;
 
                }
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_vec().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
                    // It is inconsistent to update out of bounds
 
                    None
 
                }
 
            }
 
            (Value::InputArray(_), Value::Input(_)) => todo!(),
 
            (Value::OutputArray(_), Value::Output(_)) => todo!(),
 
            (Value::MessageArray(_), Value::Message(_)) => todo!(),
 
            (Value::BooleanArray(_), Value::Boolean(_)) => todo!(),
 
            (Value::ByteArray(_), Value::Byte(_)) => todo!(),
 
            (Value::ShortArray(_), Value::Short(_)) => todo!(),
 
            (Value::IntArray(_), Value::Int(_)) => todo!(),
 
            (Value::LongArray(_), Value::Long(_)) => todo!(),
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn get(&self, index: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
src/runtime/communication.rs
Show inline comments
 
@@ -74,112 +74,137 @@ impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        assert!(map.is_empty()); // sanity check
 
        Self(map)
 
    }
 
}
 
impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        assert!(self.0.is_empty()); // sanity check
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
    /// Read the message received by the given port in the previous synchronous round.
 
    pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        if let ConnectorPhased::Communication(comm) = &self.phased {
 
            match &comm.round_result {
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            }
 
        } else {
 
            return Err(Ge::NoPreviousRound);
 
        }
 
    }
 
    /// Creates a new, empty synchronous batch for the connector and selects it.
 
    /// Subsequent calls to `put` and `get` with populate the new batch with port operations.
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            comm.native_batches.push(Default::default());
 
            Ok(comm.native_batches.len() - 1)
 
        } else {
 
            Err(WrongStateError)
 
        }
 
    }
 

	
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.inner.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        if info.polarity != expect_polarity {
 
            return Err(Poe::WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 

	
 
    /// Add a `put` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 

	
 
    /// Add a `get` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 

	
 
    /// Participate in the completion of the next synchronous round, in which
 
    /// the native component will perform the set of prepared operations of exactly one
 
    /// of the synchronous batches. At the end of the procedure, the synchronous
 
    /// batches will be reset to a singleton set, whose only element is selected, and empty.
 
    /// The caller yields control over to the connector runtime to faciltiate the underlying
 
    /// coordination work until either (a) the round is completed with all components' states
 
    /// updated accordingly, (b) a distributed failure event resets all components'
 
    /// states to what they were prior to the sync call, or (c) the sync procedure encounters
 
    /// an unrecoverable error which ends the call early, and breaks the session and connector's
 
    /// states irreversably.
 
    /// Note that the (b) case necessitates the success of a distributed rollback procedure,
 
    /// which this component may initiate, but cannot guarantee will succeed in time or at all.
 
    /// consequently, the given timeout duration represents a duration in which the connector
 
    /// will make a best effort to fail the round and return control flow to the caller.
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.logger(), "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
@@ -1047,70 +1072,48 @@ impl BranchingProtoComponent {
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    // fn is_clear(&self) -> bool {
 
    //     self.subtree_id_to_index.is_empty()
 
    //         && self.subtree_solutions.is_empty()
 
    //         && self.old_local.is_empty()
 
    //         && self.new_local.is_empty()
 
    // }
 
    // fn clear(&mut self) {
 
    //     self.subtree_id_to_index.clear();
 
    //     self.subtree_solutions.clear();
 
    //     self.old_local.clear();
 
    //     self.new_local.clear();
 
    // }
 
    // fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
    //     self.subtree_id_to_index.clear();
 
    //     self.subtree_solutions.clear();
 
    //     self.old_local.clear();
 
    //     self.new_local.clear();
 
    //     for key in subtree_ids {
 
    //         self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
    //         self.subtree_solutions.push(Default::default())
 
    //     }
 
    // }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            // iterator over SETS of solutions, one for every component except `subtree_id` (me)
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
@@ -1147,76 +1150,76 @@ impl SolutionStorage {
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.branch_inner.did_put_or_get.insert(port);
 
        self.branch_inner.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        // sanity check
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.current_state.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // 2. create new component
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // 3. update ownership of moved ports
 
        for port in moved_ports.iter() {
 
            self.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        // 3. create a new component
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    UdpConnectFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    NetEndpointSetupError(SocketAddr, NetEndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    DuplicatePort(PortId),
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenNetEndpoint { index: usize },
 
    BrokenUdpEndpoint { index: usize },
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ComponentId),
 
    RoundFailure,
 
    Unrecoverable(UnrecoverableSyncError),
 
}
 
#[derive(Debug, Clone)]
src/runtime/mod.rs
Show inline comments
 
/// cbindgen:ignore
 
mod communication;
 
/// cbindgen:ignore
 
mod endpoints;
 
pub mod error;
 
/// cbindgen:ignore
 
mod logging;
 
/// cbindgen:ignore
 
mod setup;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
/// Each Connector structure is the interface between the user's application and a communication session,
 
/// in which the application plays the part of a (native) component. This structure provides the application
 
/// with functionality available to all components: the ability to add new channels (port pairs), and to
 
/// instantiate new components whose definitions are defined in the connector's configured protocol
 
/// description. Native components have the additional ability to add `dangling' ports backed by local/remote
 
/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`.
 
/// This allows sets of applications to cooperate in constructing shared sessions that span the network.
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 

	
 
/// Characterizes a type which can write lines of logging text.
 
/// The implementations provided in the `logging` module are likely to be sufficient,
 
/// but for added flexibility, users are able to implement their own loggers for use
 
/// by connectors.
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 

	
 
/// A logger that appends the logged strings to a growing byte buffer
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 

	
 
/// A trivial logger that always returns None, such that no logging information is ever written.
 
#[derive(Debug)]
 
pub struct DummyLogger;
 

	
 
/// A logger that writes the logged lines to a given file.
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    current_state: &'a mut CurrentState,
 
    logger: &'a mut dyn Logger,
 
    // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,                              // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    rctx: &'a RoundCtx,
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                        // KEY in pred->branch map
 
}
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
@@ -369,111 +387,149 @@ impl IdManager {
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 

	
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
}
 
impl Connector {
 
    /// Generate a random connector identifier from the system's source of randomness.
 
    pub fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete,
 
    /// and it is ready to participate in synchronous rounds of communication.
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
        // get a lot closer to the connector impl. itself.
 
        // Instead, the C-oriented implementation doesn't distinguish connector states as types,
 
        // and distinguish them as enum variants instead
 
        match self.phased {
 
            ConnectorPhased::Setup(..) => false,
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 
    pub(crate) fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
    /// with polarity output, input respectively. Available during either setup/communication phase.
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.inner.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid(), new_cid()];
 
        cu.inner.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.inner.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.inner.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.inner.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
    /// Instantiates a new component for the connector runtime to manage, and passing
 
    /// the given set of ports from the interface of the native component, to that of the
 
    /// newly created component (passing their ownership).
 
    /// # Errors
 
    /// Error is returned if the moved ports are not owned by the native component,
 
    /// if the given component name is not defined in the connector's protocol,
 
    /// the given sequence of ports contains a duplicate port,
 
    /// or if the component is unfit for instantiation with the given port sequence.
 
    /// # Panics
 
    /// This function panics if the connector's (large) component id space is exhausted.
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let cu = &mut self.unphased;
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.inner.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // 2. add new component
 
        let new_cid = cu.inner.current_state.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // 3. update port ownership
 
        for port in ports.iter() {
 
            match cu.inner.current_state.port_info.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
@@ -554,59 +610,59 @@ impl Predicate {
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Aur::Equivalent,       // ... equivalent to both.
 
            [false, true] => Aur::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Aur::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Aur::New(new)
 
            }
 
        }
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
    pub(crate) fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
    pub(crate) fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct Assignment<'a>((&'a SpecVar, &'a SpecVal));
 
        impl Debug for Assignment<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                write!(f, "{:?}={:?}", (self.0).0, (self.0).1)
 
            }
 
        }
 
        f.debug_set().entries(self.assigned.iter().map(Assignment)).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                inner: ConnectorUnphasedInner {
 
                    logger,
 
                    native_component_id,
 
                    current_state: CurrentState { id_manager, port_info: Default::default() },
 
                },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
@@ -69,85 +80,98 @@ impl Connector {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let new_pid = cu.inner.current_state.id_manager.new_port_id();
 
                cu.inner.current_state.port_info.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.inner.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.inner.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.inner.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.inner.current_state.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.inner.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.inner.current_state.port_info,
src/runtime/tests.rs
Show inline comments
 
@@ -1170,77 +1170,77 @@ fn xrouter_prim() {
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("PRIM {:?}", now.elapsed());
 
}
 
#[test]
 
fn xrouter_comp() {
 
    let test_log_path = Path::new("./logs/xrouter_comp");
 
    let pdl = b"
 
    primitive lossy(in a, out b) {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                msg m = get(a);
 
                if(fires(b)) put(b, get(a));
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
    primitive sync_drain(in a, in b) {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                get(a);
 
                get(b);
 
            }
 
        }
 
    }
 
    composite xrouter(in a, out b, out c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 
        channel p -> q;
 
        channel r -> s;
 
        channel t -> u;
 

	
 
        new replicator(a, d, f); // ok
 
        new replicator(g, t, h); // ok
 
        new lossy(e, l); // ok
 
        new lossy(i, j); // ok
 
        new replicator(m, b, p); // ok
 
        new replicator(k, n, c); // ok
 
        new replicator(a, d, f);
 
        new replicator(g, t, h);
 
        new lossy(e, l);
 
        new lossy(i, j);
 
        new replicator(m, b, p);
 
        new replicator(k, n, c);
 
        new merger(q, o, r);
 
        new sync_drain(u, s);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
0 comments (0 inline, 0 general)