Changeset - d76b1fe2648f
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-09-22 15:43:19
christopher.esterhuyse@gmail.com
mild cleanup and major fleshing out of doc comments
8 files changed with 193 insertions and 74 deletions:
0 comments (0 inline, 0 general)
README.md
Show inline comments
 
@@ -18,4 +18,16 @@ This library provides connectors as a generalization of sockets for use in commu
 
The `examples` directory contains example usages of connectors for message passing over the internet. The programs within require that the library is compiled as a dylib (see above).
 

	
 
## Notes
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. (WIP) This is only necessary to update it.  
 
\ No newline at end of file
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. This is only necessary to update it.  
 

	
 
## Short User Overview
 
The bulk of the library's functionality is exposed to the user in two types: 
 
1. `protocol::ProtocolDescription` 
 
1. `runtime::Connector` 
 

	
 
The former is created using `parse`. For the most part, the user is not expected to interact much with the structure, only passing it to the connector as a communication session is being set up.
 

	
 
The latter is created with `new`, configured with methods such as `new_net_port` and `add_component`, and connected via `connect`, whereafter it can be used for multi-party communication through methods `put`, `get`, `next_batch`, and `sync`.
 

	
 
## Contributor Overview
 
The details of the implementation are best understood by reading the doc comments, starting from the procedures listed in the section above. It is suggested to first/also refer to the Reowolf project's companion documentation (link TODO) for a higher level overview of the goals and design of the implementation.
 
\ No newline at end of file
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub(crate) use crate::protocol::{ComponentState, ProtocolDescription};
 
pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext};
 

	
 
pub(crate) use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
@@ -10,7 +8,6 @@ pub(crate) use core::{
 
    ops::Range,
 
    time::Duration,
 
};
 
// pub(crate) use indexmap::IndexSet;
 
pub(crate) use maplit::hashmap;
 
pub(crate) use mio::{
 
    net::{TcpListener, TcpStream},
 
@@ -29,16 +26,17 @@ pub(crate) use Polarity::*;
 
pub(crate) trait IdParts {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix);
 
}
 

	
 
/// Used by various distributed algorithms to identify connectors.
 
pub type ConnectorId = u32;
 

	
 
/// Used in conjunction with the `ConnectorId` type to create identifiers for ports and components
 
pub type U32Suffix = u32;
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
// pub, because it can be acquired via error in the Rust API
 
pub struct ComponentId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// Generalization of a port/component identifier
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
@@ -48,16 +46,28 @@ pub struct Id {
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
/// Identifier of a component in a session
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ComponentId(Id); // PUB because it can be returned by errors
 

	
 
/// Identifier of a port in a session
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 

	
 
/// A safely aliasable heap-allocated payload of message bytes
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// "Orientation" of a port, determining whether they can send or receive messages with `put` and `get` respectively.
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
@@ -66,11 +76,15 @@ pub enum Polarity {
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 

	
 
/// "Orientation" of a transport-layer network endpoint, dictating how it's connection procedure should
 
/// be conducted. Corresponds with connect() / accept() familiar to TCP socket programming.
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
@@ -133,6 +147,7 @@ impl From<&[u8]> for Payload {
 
    }
 
}
 
impl Payload {
 
    /// Create a new payload of uninitialized bytes with the given length.
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
@@ -140,18 +155,26 @@ impl Payload {
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    /// Returns the length of the payload's byte sequence
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    /// Allows shared reading of the payload's contents
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 

	
 
    /// Allows mutation of the payload's contents.
 
    /// Results in a deep copy in the event this payload is aliased.
 
    pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
 
        Arc::make_mut(&mut self.0)
 
    }
 

	
 
    /// Modifies this payload, concatenating the given immutable payload's contents.
 
    /// Results in a deep copy in the event this payload is aliased.
 
    pub fn concatenate_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        let me = self.as_mut_vec();
 
        me.extend(bytes);
 
    }
 
}
src/protocol/eval.rs
Show inline comments
 
@@ -113,7 +113,7 @@ impl Value {
 
                    // It is inconsistent to update with a negative value
 
                    return None;
 
                }
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_vec().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
 
@@ -126,7 +126,7 @@ impl Value {
 
                    // It is inconsistent to update with a negative value or a too large value
 
                    return None;
 
                }
 
                if let Some(slot) = payload.as_mut_slice().get_mut(the_index) {
 
                if let Some(slot) = payload.as_mut_vec().get_mut(the_index) {
 
                    *slot = (*b).try_into().unwrap();
 
                    Some(value.clone())
 
                } else {
src/runtime/communication.rs
Show inline comments
 
@@ -95,28 +95,31 @@ impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
    /// Read the message received by the given port in the previous synchronous round.
 
    pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        if let ConnectorPhased::Communication(comm) = &self.phased {
 
            match &comm.round_result {
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            }
 
        } else {
 
            return Err(Ge::NoPreviousRound);
 
        }
 
    }
 
    /// Creates a new, empty synchronous batch for the connector and selects it.
 
    /// Subsequent calls to `put` and `get` with populate the new batch with port operations.
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            comm.native_batches.push(Default::default());
 
            Ok(comm.native_batches.len() - 1)
 
        } else {
 
            Err(WrongStateError)
 
        }
 
    }
 

	
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
@@ -139,6 +142,10 @@ impl Connector {
 
            }
 
        }
 
    }
 

	
 
    /// Add a `put` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
@@ -149,6 +156,10 @@ impl Connector {
 
            Ok(())
 
        }
 
    }
 

	
 
    /// Add a `get` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
@@ -158,7 +169,21 @@ impl Connector {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 

	
 
    /// Participate in the completion of the next synchronous round, in which
 
    /// the native component will perform the set of prepared operations of exactly one
 
    /// of the synchronous batches. At the end of the procedure, the synchronous
 
    /// batches will be reset to a singleton set, whose only element is selected, and empty.
 
    /// The caller yields control over to the connector runtime to faciltiate the underlying
 
    /// coordination work until either (a) the round is completed with all components' states
 
    /// updated accordingly, (b) a distributed failure event resets all components'
 
    /// states to what they were prior to the sync call, or (c) the sync procedure encounters
 
    /// an unrecoverable error which ends the call early, and breaks the session and connector's
 
    /// states irreversably.
 
    /// Note that the (b) case necessitates the success of a distributed rollback procedure,
 
    /// which this component may initiate, but cannot guarantee will succeed in time or at all.
 
    /// consequently, the given timeout duration represents a duration in which the connector
 
    /// will make a best effort to fail the round and return control flow to the caller.
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
@@ -1068,28 +1093,6 @@ impl SolutionStorage {
 
            new_local: Default::default(),
 
        }
 
    }
 
    // fn is_clear(&self) -> bool {
 
    //     self.subtree_id_to_index.is_empty()
 
    //         && self.subtree_solutions.is_empty()
 
    //         && self.old_local.is_empty()
 
    //         && self.new_local.is_empty()
 
    // }
 
    // fn clear(&mut self) {
 
    //     self.subtree_id_to_index.clear();
 
    //     self.subtree_solutions.clear();
 
    //     self.old_local.clear();
 
    //     self.new_local.clear();
 
    // }
 
    // fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
    //     self.subtree_id_to_index.clear();
 
    //     self.subtree_solutions.clear();
 
    //     self.old_local.clear();
 
    //     self.new_local.clear();
 
    //     for key in subtree_ids {
 
    //         self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
    //         self.subtree_solutions.push(Default::default())
 
    //     }
 
    // }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
@@ -1168,7 +1171,7 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        // sanity check
 
@@ -1195,7 +1198,7 @@ impl NonsyncProtoContext<'_> {
 
        }
 
        // 3. create a new component
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
src/runtime/error.rs
Show inline comments
 
@@ -15,6 +15,7 @@ pub enum ConnectError {
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    DuplicatePort(PortId),
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
src/runtime/mod.rs
Show inline comments
 
@@ -15,18 +15,36 @@ use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
/// Each Connector structure is the interface between the user's application and a communication session,
 
/// in which the application plays the part of a (native) component. This structure provides the application
 
/// with functionality available to all components: the ability to add new channels (port pairs), and to
 
/// instantiate new components whose definitions are defined in the connector's configured protocol
 
/// description. Native components have the additional ability to add `dangling' ports backed by local/remote
 
/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`.
 
/// This allows sets of applications to cooperate in constructing shared sessions that span the network.
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 

	
 
/// Characterizes a type which can write lines of logging text.
 
/// The implementations provided in the `logging` module are likely to be sufficient,
 
/// but for added flexibility, users are able to implement their own loggers for use
 
/// by connectors.
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 

	
 
/// A logger that appends the logged strings to a growing byte buffer
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 

	
 
/// A trivial logger that always returns None, such that no logging information is ever written.
 
#[derive(Debug)]
 
pub struct DummyLogger;
 

	
 
/// A logger that writes the logged lines to a given file.
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
@@ -390,7 +408,32 @@ impl Drop for Connector {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 

	
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
}
 
impl Connector {
 
    /// Generate a random connector identifier from the system's source of randomness.
 
    pub fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete,
 
    /// and it is ready to participate in synchronous rounds of communication.
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
@@ -402,23 +445,22 @@ impl Connector {
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 
    pub(crate) fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
    /// with polarity output, input respectively. Available during either setup/communication phase.
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
@@ -445,6 +487,17 @@ impl Connector {
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
    /// Instantiates a new component for the connector runtime to manage, and passing
 
    /// the given set of ports from the interface of the native component, to that of the
 
    /// newly created component (passing their ownership).
 
    /// # Errors
 
    /// Error is returned if the moved ports are not owned by the native component,
 
    /// if the given component name is not defined in the connector's protocol,
 
    /// the given sequence of ports contains a duplicate port,
 
    /// or if the component is unfit for instantiation with the given port sequence.
 
    /// # Panics
 
    /// This function panics if the connector's (large) component id space is exhausted.
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
@@ -453,6 +506,9 @@ impl Connector {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let cu = &mut self.unphased;
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
@@ -575,7 +631,7 @@ impl Predicate {
 
            }
 
        }
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
    pub(crate) fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
@@ -585,7 +641,7 @@ impl Predicate {
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
    pub(crate) fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -2,6 +2,17 @@ use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
@@ -90,6 +101,10 @@ impl Connector {
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
@@ -127,6 +142,15 @@ impl Connector {
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
src/runtime/tests.rs
Show inline comments
 
@@ -1191,7 +1191,7 @@ fn xrouter_comp() {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                msg m = get(a);
 
                if(fires(b)) put(b, get(a));
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
@@ -1214,12 +1214,12 @@ fn xrouter_comp() {
 
        channel r -> s;
 
        channel t -> u;
 

	
 
        new replicator(a, d, f); // ok
 
        new replicator(g, t, h); // ok
 
        new lossy(e, l); // ok
 
        new lossy(i, j); // ok
 
        new replicator(m, b, p); // ok
 
        new replicator(k, n, c); // ok
 
        new replicator(a, d, f);
 
        new replicator(g, t, h);
 
        new lossy(e, l);
 
        new lossy(i, j);
 
        new replicator(m, b, p);
 
        new replicator(k, n, c);
 
        new merger(q, o, r);
 
        new sync_drain(u, s);
 
    }
0 comments (0 inline, 0 general)