Changeset - 6a7d3acfcb5e
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-10-23 13:57:30
christopher.esterhuyse@gmail.com
logging feature exposed for ease of use
6 files changed with 103 insertions and 27 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -26,25 +26,26 @@ backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
crate-type = [
 
	"rlib", # for use as a Rust dependency. 
 
	"cdylib" # for FFI use, typically C.
 
]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
 
session_optimization = [] # see src/runtime/setup.rs
 
no_logging = [] # see src/macros.rs
 
\ No newline at end of file
README.md
Show inline comments
 
# Reowolf 1.0 Implementation
 

	
 
This library provides connectors as a generalization of sockets for use in communication over the internet. This repository is one of the deliverables of the [Reowolf project](https://nlnet.nl/project/Reowolf/) funded by the NLNet foundation.
 

	
 
## Compilation instructions
 
1. Install the latest stable Rust toolchain (`rustup install stable`) using the [rustup](https://rustup.rs/) CLI tool, available for most platforms.
 
1. Have `cargo` (the Rust language package manager) download source dependencies, and compile the library with release-level optimizations with `cargo build --release`: 
 
	- The resulting dylib can be found in `./target/release/`, to be used with the header file: `./reowolf.h`.
 
	- *Note*: A list of immediate ancestor dependencies is visible in `./Cargo.toml`.
 
	- *Note*: Run `cargo test --release` to run unit tests with release-level optimizations.
 
1. By default, will compile with logging turned ON and session optimization turned OFF. Control this by passing the desired set of feature flags in {"no_logging", "session_optimization"} to the compiler. For example, `cargo build --release --features no_logging`.
 

	
 
## Using the library
 
- The library may be used as a Rust dependency by adding it as a git dependency, i.e., by adding line `reowolf_rs = { git = "https://scm.cwi.nl/FM/reowolf" }` to downstream crate's manifest file, `Cargo.toml`.
 
- The library may be used as a dynamically-linked library using its C ABI as the cdylib written to `./target/release` when compiled with release optimizations, in combination to the header file `./reowolf.h`.
 
- When compiled on Linux, the compiled library will include definitions of pseudo-socket procedures declared in `./pseudo-socket.h` when compiled with `cargo build --release --features ffi_pseudo_socket_api`. The added functionality is only needed when requiring that connectors expose a socket-like API.
 

	
 
## Examples
 
The `examples` directory contains example usages of connectors for message passing over the internet. The programs within require that the library is compiled as a dylib (see above).
 

	
 
## Notes
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. This is only necessary to update it.  
 

	
 
## Short User Overview
 
The bulk of the library's functionality is exposed to the user in two types: 
 
1. `protocol::ProtocolDescription` 
 
1. `runtime::Connector` 
 

	
 
The former is created using `parse`. For the most part, the user is not expected to interact much with the structure, only passing it to the connector as a communication session is being set up.
 

	
 
The latter is created with `new`, configured with methods such as `new_net_port` and `add_component`, and connected via `connect`, whereafter it can be used for multi-party communication through methods `put`, `get`, `next_batch`, and `sync`.
 

	
 
## Contributor Overview
 
The details of the implementation are best understood by reading the doc comments, starting from the procedures listed in the section above. It is suggested to first/also refer to the Reowolf project's companion documentation (link TODO) for a higher level overview of the goals and design of the implementation.
 
\ No newline at end of file
src/macros.rs
Show inline comments
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
        #[cfg(not(feature = "no_logging"))]
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -51,48 +51,53 @@ struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
}
 

	
 
// Small convenience trait for extending the stdlib's bool type with
 
// an optionlike replace method for increasing brevity.
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 

	
 
//////////////// IMPL ////////////////////////////
 

	
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure,
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) {
 
        (&mut *self.logger, &self.proto_description)
 
    }
 
    fn logger_and_protocol_components(
 
        &mut self,
 
    ) -> (&mut dyn Logger, &mut HashMap<ComponentId, ComponentState>) {
 
        (&mut *self.logger, &mut self.proto_components)
 
    }
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.logger
 
    }
 
    fn proto_description(&self) -> &ProtocolDescription {
 
        &self.proto_description
 
    }
 
    fn native_component_id(&self) -> ComponentId {
 
        self.native_component_id
 
    }
 
}
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
    }
 
    fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) {
 
        let (head, tail) = self.0.split_first_mut().expect("Cache exhausted");
 
        (MapTempGuard::new(head), MapTempsGuard(tail))
 
    }
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        assert!(map.is_empty()); // sanity check
 
        Self(map)
 
    }
 
@@ -253,49 +258,49 @@ impl Connector {
 
            .proto_components
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len());
 
        // initially, the set of components to run is the set of components stored by `cu`,
 
        // but they are eventually drained into `branching_proto_components`.
 
        // Some components exit first, and others are created and put into `unrun_components`.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.logger(),
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let (logger, proto_description) = cu.logger_and_protocol_description();
 
            let mut ctx = NonsyncProtoContext {
 
                ips: &mut ips,
 
                logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
            };
 
            let blocker = component.nonsync_run(&mut ctx, proto_description);
 
            log!(
 
                cu.logger(),
 
                logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => assert!(branching_proto_components
 
                    .insert(proto_component_id, BranchingProtoComponent::initial(component))
 
                    .is_none()), // Some(_) returned IFF some component identifier key is overwritten (BAD!)
 
            }
 
        }
 
        log!(
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temporary structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            ips, // already used previously, now moved into RoundCtx
 
            solution_storage: {
 
                let subtree_id_iter = {
 
@@ -431,64 +436,65 @@ impl Connector {
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.logger(),
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // untouched port/component fields of `cu` are NOT overwritten.
 
                // the result is a rollback.
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                let (logger, proto_components) = cu.logger_and_protocol_components();
 
                proto_components.extend(
 
                    // "flatten" branching components, committing the speculation
 
                    // consistent with the predicate decided upon.
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))),
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(logger, &predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.ips = rctx.ips;
 
                log!(
 
                    cu.logger,
 
                    logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                    proto_components.keys()
 
                );
 
                cu.ips = rctx.ips;
 
                // consume native
 
                let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate);
 
                let round_ok = branching_native.collapse_with(cu.logger(), &predicate);
 
                Ok(Some(round_ok))
 
            }
 
        };
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    // Once the synchronous round has been started, this procedure
 
    // routs and handles payloads, receives control messages from neighboring connectors,
 
    // checks for timeout, and aggregates solutions until a distributed decision is reached.
 
    // The decision is either a solution (success case), or a distributed timeout rollback (failure case)
 
    // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior,
 
    // a network channel breaking, etc.
 
    fn sync_reach_decision(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        // The round is in progress, and now its just a matter of arriving at a decision.
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            // An unsatisfiable native is the easiest way to detect failure
 
@@ -951,48 +957,49 @@ impl BranchingNative {
 
        self,
 
        logger: &mut dyn Logger,
 
        solution_predicate: &Predicate,
 
    ) -> RoundEndedNative {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundEndedNative { batch_index: index, gotten };
 
            }
 
        }
 
        log!(logger, "Native had no branches matching pred {:?}", solution_predicate);
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 

	
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output by cyclically draining "input" -> "cd."input" / cd.output.
 
    // (to prevent concurrent r/w of one structure, we realize "input" as cd.input for reading and cd.swap for writing)
 
    // This procedure might lose branches, and it might create new branches.
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // let CyclicDrainer { input, swap, output } = cd;
 
        while !cd.input.is_empty() {
 
            'branch_iter: for (mut predicate, mut branch) in cd.input.drain() {
 
@@ -1193,56 +1200,61 @@ impl BranchingProtoComponent {
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
    fn collapse_with(
 
        self,
 
        logger: &mut dyn Logger,
 
        solution_predicate: &Predicate,
 
    ) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return state;
 
            }
 
        }
 
        log!(logger, "ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl ProtoComponentBranch {
 
    // Feed this branch received message.
 
    // It's safe to receive the same message repeatedly,
 
    // but if we receive a message with different contents,
 
    // it's a sign something has gone wrong! keys of type (port, round, predicate)
 
    // should always map to at most one message value!
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let e = self.inner.inbox.entry(getter);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // new message
 
                ev.insert(payload);
 
            }
 
            Entry::Occupied(eo) => {
 
                // redundant recv. can happen as a result of a
 
                // component A having two branches X and Y related by
 
                assert_eq!(eo.get(), &payload);
 
            }
 
        }
 
    }
src/runtime/mod.rs
Show inline comments
 
@@ -404,48 +404,51 @@ struct SolutionStorage {
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
    fn logger_and_protocol_components(
 
        &mut self,
 
    ) -> (&mut dyn Logger, &mut HashMap<ComponentId, ComponentState>);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
src/runtime/tests.rs
Show inline comments
 
@@ -1291,83 +1291,141 @@ fn for_msg_byte() {
 
        while(i<8) {
 
            msg m = create(1);
 
            m[0] = i;
 
            synchronous() put(o, m);
 
            i++;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn eq_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    primitive eq(in a, in b, out c) {
 
        msg ma = null;
 
        msg mb = null;
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                // left first!
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..4 {
 
        // everything is fine with LEFT FIRST
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap();
 

	
 
        // no solution when left is NOT FIRST
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap_err();
 
    }
 
}
 

	
 
#[test]
 
fn eq_no_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    composite eq(in a, in b, out c) {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) {
 
        msg ma = null;
 
        msg mb = null;
 
        while(true) synchronous {
 
            if(fires(leftfirsti)) {
 
                // left first! DO USE DUMMY
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 

	
 
                // using dummy!
 
                put(leftfirsto, ma);
 
                get(leftfirsti);
 
            } else {
 
                // right first! DON'T USE DUMMY
 
                mb = get(b);
 
                put(c, mb);
 
                ma = get(a);
 
            if(fires(a)) {
 
                // b and c also fire!
 
                if(fires(leftfirsti)) {
 
                    // left first! DO USE DUMMY
 
                    ma = get(a);
 
                    put(c, ma);
 
                    mb = get(b);
 

	
 
                    // using dummy!
 
                    put(leftfirsto, ma);
 
                    get(leftfirsti);
 
                } else {
 
                    // right first! DON'T USE DUMMY
 
                    mb = get(b);
 
                    put(c, mb);
 
                    ma = get(a);
 
                }
 
                assert(ma == mb);
 
            }
 
            assert(ma == mb);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..32 {
 
        // ok when they send
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(SEC1).unwrap();
 
        // ok when they don't
 
        c.sync(SEC1).unwrap();
 
    }
 
}
0 comments (0 inline, 0 general)