diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 7782a5552e063b79ef960d104ff76a3cbd1ac8c6..26e13100c7018bf8425bca02de578f8810109573 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -4,8 +4,8 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - - let mut table_row = HashMap::default(); + let mut table_row = HashMap::::default(); + // 1. become_mono for Poly actors self.inner.mono_n = self .ephemeral .poly_n @@ -14,10 +14,20 @@ impl Controller { self.inner.mono_ps.extend( self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)), ); - for (ekey, payload) in table_row { - let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + + // convert (Key=>Payload) map to (ChannelId=>Payload) map. + let table_row: HashMap<_, _> = table_row + .into_iter() + .map(|(ekey, msg)| { + let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + (channel_id, msg) + }) + .collect(); + // log all firing ports + for (channel_id, payload) in table_row { log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload); } + // log all silent ports for channel_id in decision.iter_matching(false) { log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id); } diff --git a/src/test/connector.rs b/src/test/connector.rs index cbb529ab9d8ad2bdd6a75ed1d6fe64933b8af973..577a74810e247bbeaacf70c19768cfa019aea208 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -2,10 +2,8 @@ extern crate test_generator; use super::*; -use std::thread; - use crate::common::*; -use crate::runtime::{errors::*, PortBinding::*, *}; +use crate::runtime::{errors::*, PortBinding::*}; // using a static AtomicU16, shared between all tests in the binary, // allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000.. @@ -19,307 +17,376 @@ fn next_addr() -> SocketAddr { SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into() } +static PDL: &[u8] = b" +primitive blocked(in i, out o) { + while(true) synchronous {} +} +primitive forward(in i, out o) { + while(true) synchronous { + put(o, get(i)); + } +} +primitive sync(in i, out o) { + while(true) synchronous { + if (fires(i)) put(o, get(i)); + } +} +primitive alternator_2(in i, out a, out b) { + while(true) { + synchronous { put(a, get(i)); } + synchronous { put(b, get(i)); } + } +} +composite sync_2(in i, out o) { + channel x -> y; + new sync(i, x); + new sync(y, o); +} +composite forward_pair(in ia, out oa, in ib, out ob) { + new forward(ia, oa); + new forward(ib, ob); +} +primitive forward_nonzero(in i, out o) { + while(true) synchronous { + msg m = get(i); + if(m[0]==0) put(o, m); + } +} +primitive token_spout(out o) { + while(true) synchronous { + put(o, create(0)); + } +} +primitive wait_10(out o) { + int i = 0; + while(i < 10) { + synchronous {} + i += 1; + } + synchronous { put(o, create(0)); } +} +"; + #[test] -fn incremental() { +fn connects_ok() { + // Test if we can connect natives using the given PDL + /* + Alice -->silence--P|A-->silence--> Bob + */ let timeout = Duration::from_millis(1_500); - let addrs = [next_addr(), next_addr()]; - let handles = vec![ - thread::spawn(move || { - let controller_id = 0; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(out a, out b) { - synchronous { - msg m = create(0); - put(a, m); - } - }", - b"main", - ) - .unwrap(); - x.bind_port(0, Passive(addrs[0])).unwrap(); - x.bind_port(1, Passive(addrs[1])).unwrap(); + let addrs = [next_addr()]; + do_all(&[ + &|x| { + // Alice + x.configure(PDL, b"blocked").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }), - thread::spawn(move || { - let controller_id = 1; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(in a, in b) { - synchronous { - get(a); - } - }", - b"main", - ) - .unwrap(); + }, + &|x| { + // Bob + x.configure(PDL, b"blocked").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); - x.bind_port(1, Active(addrs[1])).unwrap(); + x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }), - ]; - for h in handles { - handle(h.join()) - } + }, + ]); } #[test] -fn duo_positive() { +fn connected_but_silent_natives() { + // Test if we can connect natives and have a trivial sync round + /* + Alice -->silence--P|A-->silence--> Bob + */ let timeout = Duration::from_millis(1_500); - let addrs = [next_addr(), next_addr()]; - let a = thread::spawn(move || { - let controller_id = 0; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(out a, out b) { - synchronous {} - synchronous {} - synchronous { - msg m = create(0); - put(a, m); - } - synchronous { - msg m = create(0); - put(b, m); - } - }", - b"main", - ) - .unwrap(); - x.bind_port(0, Passive(addrs[0])).unwrap(); - x.bind_port(1, Passive(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }); - let b = thread::spawn(move || { - let controller_id = 1; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(in a, in b) { - while (true) { - synchronous { - if (fires(a)) { - get(a); - } - } - synchronous { - if (fires(b)) { - get(b); - } - } - } - }", - b"main", - ) - .unwrap(); - x.bind_port(0, Active(addrs[0])).unwrap(); - x.bind_port(1, Active(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }); - handle(a.join()); - handle(b.join()); + let addrs = [next_addr()]; + do_all(&[ + &|x| { + // Alice + x.configure(PDL, b"blocked").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + }, + &|x| { + // Bob + x.configure(PDL, b"blocked").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + }, + ]); } #[test] -fn duo_negative() { - let timeout = Duration::from_millis(500); - let addrs = [next_addr(), next_addr()]; - let a = thread::spawn(move || { - let controller_id = 0; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(out a, out b) { - synchronous {} - synchronous { - msg m = create(0); - put(a, m); // fires a on second round - } - }", - b"main", - ) - .unwrap(); - x.bind_port(0, Passive(addrs[0])).unwrap(); - x.bind_port(1, Passive(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - let r = x.sync(timeout); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - match r { - Err(SyncErr::Timeout) => {} - x => unreachable!("{:?}", x), - } - }); - let b = thread::spawn(move || { - let controller_id = 1; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(in a, in b) { - while (true) { - synchronous { - if (fires(a)) { - get(a); - } - } - synchronous { - if (fires(b)) { // never fire a on even round - get(b); - } - } - } - }", - b"main", - ) - .unwrap(); - x.bind_port(0, Active(addrs[0])).unwrap(); - x.bind_port(1, Active(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - let r = x.sync(timeout); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - match r { - Err(SyncErr::Timeout) => {} - x => unreachable!("{:?}", x), - } - }); - handle(a.join()); - handle(b.join()); +fn self_forward_ok() { + // Test a deterministic system + // where a native has no network bindings + // and sends messages to itself + /* + /-->\ + Alice forward + \<--/ + */ + let timeout = Duration::from_millis(1_500); + const N: usize = 5; + static MSG: &[u8] = b"Echo!"; + do_all(&[ + // + &|x| { + // Alice + x.configure(PDL, b"forward").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + for _ in 0..N { + x.put(0, MSG.to_vec()).unwrap(); + x.get(1).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(MSG), x.read_gotten(1)); + } + }, + ]); +} +#[test] +fn token_spout_ok() { + // Test a deterministic system where the proto + // creates token messages + /* + Alice<--token_spout + */ + let timeout = Duration::from_millis(1_500); + const N: usize = 5; + do_all(&[ + // + &|x| { + // Alice + x.configure(PDL, b"token_spout").unwrap(); + x.bind_port(0, Native).unwrap(); + x.connect(timeout).unwrap(); + for _ in 0..N { + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0)); + } + }, + ]); } -static FORWARD: &[u8] = b" -primitive forward(in i, out o) { - while(true) synchronous { - put(o, get(i)); - } -}"; +#[test] +fn waiter_ok() { + // Test a stateful proto that blocks port 0 for 10 rounds + // and then sends a single token on the 11th + /* + Alice<--token_spout + */ + let timeout = Duration::from_millis(1_500); + do_all(&[ + // + &|x| { + // Alice + x.configure(PDL, b"wait_10").unwrap(); + x.bind_port(0, Native).unwrap(); + x.connect(timeout).unwrap(); + for _ in 0..10 { + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + } + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0)); + }, + ]); +} #[test] -fn connect_natives() { +fn self_forward_timeout() { + // Test a deterministic system + // where a native has no network bindings + // and sends messages to itself + /* + /-->\ + Alice forward + \<--/ + */ + let timeout = Duration::from_millis(500); + static MSG: &[u8] = b"Echo!"; + do_all(&[ + // + &|x| { + // Sender + x.configure(PDL, b"forward").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + x.put(0, MSG.to_vec()).unwrap(); + // native and forward components cannot find a solution + assert_eq!(Err(SyncErr::Timeout), x.sync(timeout)); + }, + ]); +} + +#[test] +fn forward_det() { + // Test if a deterministic protocol and natives can pass one message + /* + Alice -->forward--P|A-->forward--> Bob + */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; + const N: usize = 5; + static MSG: &[u8] = b"Hello!"; do_all(&[ &|x| { - x.configure(FORWARD, b"forward").unwrap(); + x.configure(PDL, b"forward").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); + for _ in 0..N { + x.put(0, MSG.to_vec()).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + } }, &|x| { - x.configure(FORWARD, b"forward").unwrap(); + x.configure(PDL, b"forward").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); + for _ in 0..N { + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(MSG), x.read_gotten(0)); + } }, ]); } #[test] -fn forward() { +fn nondet_proto_det_natives() { + // Test the use of a nondeterministic protocol + // where Alice decides the choice and the others conform + /* + Alice -->sync--A|P-->sync--> Bob + */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; + const N: usize = 5; + static MSG: &[u8] = b"Message, here!"; do_all(&[ - // &|x| { - x.configure(FORWARD, b"forward").unwrap(); + // Alice + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Native).unwrap(); - x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(1, Active(addrs[0])).unwrap(); x.connect(timeout).unwrap(); - - let msg = b"HELLO!".to_vec(); - x.put(0, msg).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); + for _i in 0..N { + x.put(0, MSG.to_vec()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } }, &|x| { - x.configure(FORWARD, b"forward").unwrap(); - x.bind_port(0, Active(addrs[0])).unwrap(); + // Bob + x.configure(PDL, b"sync").unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - - let expect = b"HELLO!".to_vec(); - x.get(0).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - assert_eq!(expect, x.read_gotten(0).unwrap()); + for _i in 0..N { + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(MSG), x.read_gotten(0)); + } }, ]); } -static SYNC: &[u8] = b" -primitive sync(in i, out o) { - while(true) synchronous { - if (fires(i)) put(o, get(i)); - } -}"; #[test] -fn native_alt() { +fn putter_determines() { + // putter and getter /* Alice -->sync--A|P-->sync--> Bob */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; const N: usize = 3; + static MSG: &[u8] = b"Hidey ho!"; do_all(&[ // &|x| { - x.configure(SYNC, b"sync").unwrap(); + // Alice + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Active(addrs[0])).unwrap(); x.connect(timeout).unwrap(); - - let msg = b"HI".to_vec(); for _i in 0..N { - // round _i*2: batches: [0=>*] - assert_eq!(0, x.sync(timeout).unwrap()); - - // round _i*2+1: batches: [0=>HI] - x.put(0, msg.clone()).unwrap(); + x.put(0, MSG.to_vec()).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); } }, &|x| { - x.configure(SYNC, b"sync").unwrap(); + // Bob + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Passive(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); + for _i in 0..N { + // batches [{0=>*}, {0=>?}] + x.get(0).unwrap(); + x.next_batch().unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(MSG), x.read_gotten(0)); + } + }, + ]); +} - let expect = b"HI".to_vec(); - for _i in 0..(2 * N) { - // round _i batches:[0=>*, 0=>HI] +#[test] +fn getter_determines() { + // putter and getter + /* + Alice -->sync--A|P-->sync--> Bob + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + const N: usize = 5; + static MSG: &[u8] = b"Hidey ho!"; + do_all(&[ + // + &|x| { + // Alice + x.configure(PDL, b"sync").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Active(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + for _i in 0..N { + // batches [{0=>?}, {0=>*}] + x.put(0, MSG.to_vec()).unwrap(); x.next_batch().unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + } + }, + &|x| { + // Bob + x.configure(PDL, b"sync").unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _i in 0..N { x.get(0).unwrap(); - match x.sync(timeout).unwrap() { - 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), - 1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)), - _ => unreachable!(), - } + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(MSG), x.read_gotten(0)); } }, ]); } -static ALTERNATOR_2: &[u8] = b" -primitive alternator_2(in i, out a, out b) { - while(true) { - synchronous { put(a, get(i)); } - synchronous { put(b, get(i)); } - } -}"; - #[test] fn alternator_2() { + // Test a deterministic system which + // alternates sending Sender's messages to A or B /* /--|-->A Sender -->alternator_2 \--|-->B @@ -327,11 +394,12 @@ fn alternator_2() { let timeout = Duration::from_millis(1_500); let addrs = [next_addr(), next_addr()]; const N: usize = 5; + static MSG: &[u8] = b"message"; do_all(&[ // &|x| { // Sender - x.configure(ALTERNATOR_2, b"alternator_2").unwrap(); + x.configure(PDL, b"alternator_2").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.bind_port(2, Passive(addrs[1])).unwrap(); @@ -339,24 +407,22 @@ fn alternator_2() { for _ in 0..N { for _ in 0..2 { - x.put(0, b"hey".to_vec()).unwrap(); + x.put(0, MSG.to_vec()).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); } } }, &|x| { // A - x.configure(SYNC, b"sync").unwrap(); + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - let expecting: &[u8] = b"hey"; - for _ in 0..N { // get msg round x.get(0).unwrap(); assert_eq!(Ok(0), x.sync(timeout)); // GET ONE - assert_eq!(Ok(expecting), x.read_gotten(0)); + assert_eq!(Ok(MSG), x.read_gotten(0)); // silent round assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE @@ -365,11 +431,10 @@ fn alternator_2() { }, &|x| { // B - x.configure(SYNC, b"sync").unwrap(); + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Active(addrs[1])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - let expecting: &[u8] = b"hey"; for _ in 0..N { // silent round @@ -379,38 +444,28 @@ fn alternator_2() { // get msg round x.get(0).unwrap(); assert_eq!(Ok(0), x.sync(timeout)); // GET ONE - assert_eq!(Ok(expecting), x.read_gotten(0)); + assert_eq!(Ok(MSG), x.read_gotten(0)); } }, ]); } -static CHAIN: &[u8] = b" -primitive sync(in i, out o) { - while(true) synchronous { - if (fires(i)) put(o, get(i)); - } -} -composite sync_2(in i, out o) { - channel x -> y; - new sync(i, x); - new sync(y, o); -}"; - #[test] +// PANIC TODO: eval::1536 fn composite_chain() { + // Check if composition works. Forward messages through long chains /* Alice -->sync-->sync-->A|P-->sync-->sync--> Bob */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr(), next_addr()]; const N: usize = 1; - static MSG: &[u8] = b"Hi, there."; + static MSG: &[u8] = b"Hippity Hoppity"; do_all(&[ // &|x| { // Alice - x.configure(CHAIN, b"sync_2").unwrap(); + x.configure(PDL, b"sync_2").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Active(addrs[0])).unwrap(); x.connect(timeout).unwrap(); @@ -421,7 +476,7 @@ fn composite_chain() { }, &|x| { // Bob - x.configure(CHAIN, b"sync_2").unwrap(); + x.configure(PDL, b"sync_2").unwrap(); x.bind_port(0, Passive(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); @@ -435,24 +490,13 @@ fn composite_chain() { ]); } -static PARITY_ROUTER: &[u8] = b" -primitive parity_router(in i, out odd, out even) { - while(true) synchronous { - msg m = get(i); - if (m[0]%2==0) { - put(even, m); - } else { - put(odd, m); - } - } -}"; - #[test] -// THIS DOES NOT YET WORK. TODOS are hit -fn parity_router() { - /* /--|-->Getsodd - Sender -->parity_router - \--|-->Getseven +// PANIC TODO: eval::1605 +fn exchange() { + /* + /-->forward-->P|A-->forward-->\ + Alice Bob + \<--forward<--P|A<--forward<--/ */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr(), next_addr()]; @@ -460,56 +504,96 @@ fn parity_router() { do_all(&[ // &|x| { - // Sender - x.configure(PARITY_ROUTER, b"parity_router").unwrap(); - x.bind_port(0, Native).unwrap(); - x.bind_port(1, Passive(addrs[0])).unwrap(); - x.bind_port(2, Passive(addrs[1])).unwrap(); + // Alice + x.configure(PDL, b"forward_pair").unwrap(); + x.bind_port(0, Native).unwrap(); // native in + x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out + x.bind_port(2, Passive(addrs[1])).unwrap(); // peer in + x.bind_port(3, Native).unwrap(); // native out x.connect(timeout).unwrap(); - - for i in 0..N { - let msg = vec![i as u8]; // messages [0], [1], [2], ... - x.put(0, msg).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); + for _ in 0..N { + x.put(0, b"A->B".to_vec()).unwrap(); + x.get(1).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(0)); } }, &|x| { - // Getsodd - x.configure(FORWARD, b"forward").unwrap(); - x.bind_port(0, Active(addrs[0])).unwrap(); - x.bind_port(1, Native).unwrap(); + // Bob + x.configure(PDL, b"forward_pair").unwrap(); + x.bind_port(0, Native).unwrap(); // native in + x.bind_port(1, Active(addrs[1])).unwrap(); // peer out + x.bind_port(2, Active(addrs[0])).unwrap(); // peer in + x.bind_port(3, Native).unwrap(); // native out x.connect(timeout).unwrap(); - for _ in 0..N { - // round _i batches:[0=>*, 0=>?] + x.put(0, b"B->A".to_vec()).unwrap(); + x.get(1).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(0)); + } + }, + ]); +} + +#[test] +// THIS DOES NOT YET WORK. TODOS are hit +fn filter_messages() { + // Make a protocol whose behavior depends on the contents of messages + // Getter prohibits the receipt of messages of the form [0, ...]. + // those messages are silent + /* + Sender -->forward-->P|A-->forward_nonzero--> Receiver + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + const N: usize = 1; + do_all(&[ + // + &|x| { + // Sender + x.configure(PDL, b"forward").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + + for i in (0..3).cycle().take(N) { + let msg = vec![i as u8]; // messages [0], [1], [2], [0], [1] ... + // batches: [{0=>*}, {0=>?}] x.next_batch().unwrap(); - x.get(0).unwrap(); + x.put(0, msg.clone()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); match x.sync(timeout).unwrap() { - 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 0 => { + // not sent + assert_eq!(&msg, &[0u8]); + } 1 => { - let msg = x.read_gotten(0).unwrap(); - assert!(msg[0] % 2 == 1); // assert msg is odd + // sent + assert_ne!(&msg, &[0u8]); } _ => unreachable!(), } } }, &|x| { - // Getseven - x.configure(FORWARD, b"forward").unwrap(); - x.bind_port(0, Active(addrs[1])).unwrap(); + // Receiver + x.configure(PDL, b"forward_nonzero").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); - for _ in 0..N { // round _i batches:[0=>*, 0=>?] x.next_batch().unwrap(); x.get(0).unwrap(); match x.sync(timeout).unwrap() { - 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 0 => { + // nothing received + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + } 1 => { - let msg = x.read_gotten(0).unwrap(); - assert!(msg[0] % 2 == 0); // assert msg is even + // msg received + assert_ne!(&[0u8], x.read_gotten(0).unwrap()); } _ => unreachable!(), }