Changeset - 8fc9f53cc0f5
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-02-06 14:54:55
christopheresterhuyse@gmail.com
more tests
2 files changed with 392 insertions and 298 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 

	
 
        let mut table_row = HashMap::default();
 
        let mut table_row = HashMap::<Key, _>::default();
 
        // 1. become_mono for Poly actors
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 
        for (ekey, payload) in table_row {
 
            let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 

	
 
        // convert (Key=>Payload) map to (ChannelId=>Payload) map.
 
        let table_row: HashMap<_, _> = table_row
 
            .into_iter()
 
            .map(|(ekey, msg)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, msg)
 
            })
 
            .collect();
 
        // log all firing ports
 
        for (channel_id, payload) in table_row {
 
            log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload);
 
        }
 
        // log all silent ports
 
        for channel_id in decision.iter_matching(false) {
 
            log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id);
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(decision) = maybe_decision {
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ekeys, .. } = self.inner.mono_n.take().unwrap();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            let all_ekeys = ekeys.iter().copied();
 
            let all_channel_ids = all_ekeys.map(ekey_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        // TODO! fuse handle_locals_return_decision and end_round_return_decision
 

	
 
        assert!(self.ephemeral.is_clear());
 

	
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len());
 
        self.ephemeral.poly_ps.clear();
 
        // let mut poly_ps: Vec<PolyP> = vec![];
 
        while let Some(mut mono_p) = self.inner.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                inner: &mut self.inner,
 
                // endpoint_exts: &mut self.endpoint_exts,
 
                // mono_ps: &mut self.mono_ps,
 
                // channel_id_stream: &mut self.channel_id_stream,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N)));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = self.inner.mono_n.iter().map(|_| SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ekeys
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        log!(&mut self.inner.logger, "No decision yet. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            let received = self.recv(deadline)?.ok_or_else(|| {
 
                log!(
 
                    &mut self.inner.logger,
 
                    ":( timing out. Solutions storage in state... {:#?}",
 
                    &self.ephemeral.solution_storage
 
                );
 
                SyncErr::Timeout
 
            })?;
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received);
 
                    // This occurs in the event the connector was malformed during connect()
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received);
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "ecvd message {:?} and its for later. DELAY! :(",
 
                        &received
 
                    );
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "recvd a round-appropriate CommMsg {:?}",
 
                        &contents
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { oracle } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &oracle
 
                    );
 
                    return self.end_round_with_decision(oracle);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let channel_id = self
 
                                .inner
 
                                .endpoint_exts
 
                                .get(received.recipient)
 
                                .expect("UEHFU")
 
                                .info
 
                                .channel_id;
 
                            if payload_predicate.query(channel_id) != Some(true) {
 
                                // sender didn't preserve the invariant
 
                                return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                            }
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
src/test/connector.rs
Show inline comments
 
extern crate test_generator;
 

	
 
use super::*;
 

	
 
use std::thread;
 

	
 
use crate::common::*;
 
use crate::runtime::{errors::*, PortBinding::*, *};
 
use crate::runtime::{errors::*, PortBinding::*};
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
 
}
 

	
 
static PDL: &[u8] = b"
 
primitive blocked(in i, out o) {
 
    while(true) synchronous {}
 
}
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}
 
composite forward_pair(in ia, out oa, in ib, out ob) {
 
    new forward(ia, oa);
 
    new forward(ib, ob);
 
}
 
primitive forward_nonzero(in i, out o) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        if(m[0]==0) put(o, m);
 
    }
 
}
 
primitive token_spout(out o) {
 
    while(true) synchronous {
 
        put(o, create(0));
 
    }
 
}
 
primitive wait_10(out o) {
 
    int i = 0;
 
    while(i < 10) {
 
        synchronous {}
 
        i += 1;
 
    }
 
    synchronous { put(o, create(0)); }
 
}
 
";
 

	
 
#[test]
 
fn incremental() {
 
fn connects_ok() {
 
    // Test if we can connect natives using the given PDL
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    let handles = vec![
 
        thread::spawn(move || {
 
            let controller_id = 0;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(out a, out b) {
 
                    synchronous {
 
                        msg m = create(0);
 
                        put(a, m);
 
                    }
 
                }",
 
                b"main",
 
            )
 
            .unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Passive(addrs[1])).unwrap();
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
        thread::spawn(move || {
 
            let controller_id = 1;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(in a, in b) {
 
                    synchronous {
 
                        get(a);
 
                    }
 
                }",
 
                b"main",
 
            )
 
            .unwrap();
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn duo_positive() {
 
fn connected_but_silent_natives() {
 
    // Test if we can connect natives and have a trivial sync round
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m);
 
                }
 
                synchronous {
 
                    msg m = create(0);
 
                    put(b, m);
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                    synchronous {
 
                        if (fires(b)) {
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn duo_negative() {
 
    let timeout = Duration::from_millis(500);
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m); // fires a on second round
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                    synchronous {
 
                        if (fires(b)) { // never fire a on even round
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
fn self_forward_ok() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Echo!";
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
            }
 
        },
 
    ]);
 
}
 
#[test]
 
fn token_spout_ok() {
 
    // Test a deterministic system where the proto
 
    // creates token messages
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"token_spout").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static FORWARD: &[u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}";
 
#[test]
 
fn waiter_ok() {
 
    // Test a stateful proto that blocks port 0 for 10 rounds
 
    // and then sends a single token on the 11th
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"wait_10").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..10 {
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
            x.get(0).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
            assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn connect_natives() {
 
fn self_forward_timeout() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(500);
 
    static MSG: &[u8] = b"Echo!";
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            x.put(0, MSG.to_vec()).unwrap();
 
            // native and forward components cannot find a solution
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn forward_det() {
 
    // Test if a deterministic protocol and natives can pass one message
 
    /*
 
    Alice -->forward--P|A-->forward--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hello!";
 
    do_all(&[
 
        &|x| {
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn forward() {
 
fn nondet_proto_det_natives() {
 
    // Test the use of a nondeterministic protocol
 
    // where Alice decides the choice and the others conform
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Message, here!";
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(FORWARD, b"forward").unwrap();
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HELLO!".to_vec();
 
            x.put(0, msg).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HELLO!".to_vec();
 
            x.get(0).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            assert_eq!(expect, x.read_gotten(0).unwrap());
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static SYNC: &[u8] = b"
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}";
 
#[test]
 
fn native_alt() {
 
fn putter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 3;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HI".to_vec();
 
            for _i in 0..N {
 
                // round _i*2: batches: [0=>*]
 
                assert_eq!(0, x.sync(timeout).unwrap());
 

	
 
                // round _i*2+1: batches: [0=>HI]
 
                x.put(0, msg.clone()).unwrap();
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>*}, {0=>?}]
 
                x.get(0).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
            let expect = b"HI".to_vec();
 
            for _i in 0..(2 * N) {
 
                // round _i batches:[0=>*, 0=>HI]
 
#[test]
 
fn getter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>?}, {0=>*}]
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static ALTERNATOR_2: &[u8] = b"
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}";
 

	
 
#[test]
 
fn alternator_2() {
 
    // Test a deterministic system which
 
    // alternates sending Sender's messages to A or B
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"message";
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(ALTERNATOR_2, b"alternator_2").unwrap();
 
            x.configure(PDL, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, b"hey".to_vec()).unwrap();
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static CHAIN: &[u8] = b"
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}";
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"Hi, there.";
 
    static MSG: &[u8] = b"Hippity Hoppity";
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(CHAIN, b"sync_2").unwrap();
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(CHAIN, b"sync_2").unwrap();
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static PARITY_ROUTER: &[u8] = b"
 
primitive parity_router(in i, out odd, out even) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        if (m[0]%2==0) {
 
            put(even, m);
 
        } else {
 
            put(odd, m);
 
        }
 
    }
 
}";
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn parity_router() {
 
    /*                    /--|-->Getsodd
 
    Sender -->parity_router
 
                          \--|-->Getseven
 
// PANIC TODO: eval::1605
 
fn exchange() {
 
    /*
 
        /-->forward-->P|A-->forward-->\
 
    Alice                             Bob
 
        \<--forward<--P|A<--forward<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PARITY_ROUTER, b"parity_router").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            // Alice
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(2, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 

	
 
            for i in 0..N {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], ...
 
                x.put(0, msg).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            for _ in 0..N {
 
                x.put(0, b"A->B".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // Getsodd
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            // Bob
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(2, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.put(0, b"B->A".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn filter_messages() {
 
    // Make a protocol whose behavior depends on the contents of messages
 
    // Getter prohibits the receipt of messages of the form [0, ...].
 
    // those messages are silent
 
    /*
 
    Sender -->forward-->P|A-->forward_nonzero--> Receiver
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 1;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for i in (0..3).cycle().take(N) {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], [0], [1] ...
 
                                         // batches: [{0=>*}, {0=>?}]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                x.put(0, msg.clone()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    0 => {
 
                        // not sent
 
                        assert_eq!(&msg, &[0u8]);
 
                    }
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 1); // assert msg is odd
 
                        // sent
 
                        assert_ne!(&msg, &[0u8]);
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
        &|x| {
 
            // Getseven
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            // Receiver
 
            x.configure(PDL, b"forward_nonzero").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    0 => {
 
                        // nothing received
 
                        assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
                    }
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 0); // assert msg is even
 
                        // msg received
 
                        assert_ne!(&[0u8], x.read_gotten(0).unwrap());
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]);
 
}
0 comments (0 inline, 0 general)