diff --git a/src/test/connector.rs b/src/test/connector.rs index 6445e15cbfded6634b23f9662347d0fc63fabdec..ad7c1ddc80170f63e055c4d84abd88c412763d38 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -19,17 +19,6 @@ primitive sync(in i, out o) { if (fires(i)) put(o, get(i)); } } -primitive fifo_1(in i, out o) { - msg holding = null; - while(true) synchronous { - if (holding == null && fires(i)) { - holding = get(i); - } else if (holding != null && fires(o)) { - put(o, holding); - holding = null; - } - } -} primitive alternator_2(in i, out a, out b) { while(true) { synchronous { put(a, get(i)); } @@ -50,11 +39,16 @@ primitive exchange(in ai, out ao, in bi, out bo) { } } } -primitive forward_nonzero(in i, out o) { +primitive filter(in i, out ok, out err) { while(true) synchronous { - msg m = get(i); - assert(m[0] != 0); - put(o, m); + if (fires(i)) { + msg m = get(i); + if(m.length > 0) { + put(ok, m); + } else { + put(err, m); + } + } } } primitive token_spout(out o) { @@ -69,6 +63,19 @@ primitive wait_n(int to_wait, out o) { composite wait_10(out o) { new wait_n(10, o); } +primitive fifo_1(msg m, in i, out o) { + while(true) synchronous { + if (m == null && fires(i)) { + m = get(i); + } else if (m != null && fires(o)) { + put(o, m); + m = null; + } + } +} +composite fifo_1_e(in i, out o) { + new fifo_1(null, i, o); +} "; #[test] @@ -388,73 +395,6 @@ fn getter_determines() { ])); } -#[test] -fn fifo_2() { - // Test a deterministic system which - // alternates sending Sender's messages to A or B - /* /--|-->A - Sender -->alternator_2 - \--|-->B - */ - let timeout = Duration::from_millis(1_500); - let addrs = [next_addr(), next_addr()]; - const N: usize = 5; - static MSG: &[u8] = b"message"; - assert!(run_connector_set(&[ - // - &|x| { - // Sender - x.configure(PDL, b"alternator_2").unwrap(); - x.bind_port(0, Native).unwrap(); - x.bind_port(1, Passive(addrs[0])).unwrap(); - x.bind_port(2, Passive(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - - for _ in 0..N { - for _ in 0..2 { - x.put(0, MSG.to_vec()).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - } - } - }, - &|x| { - // A - x.configure(PDL, b"sync").unwrap(); - x.bind_port(0, Active(addrs[0])).unwrap(); - x.bind_port(1, Native).unwrap(); - x.connect(timeout).unwrap(); - for _ in 0..N { - // get msg round - x.get(0).unwrap(); - assert_eq!(Ok(0), x.sync(timeout)); // GET ONE - assert_eq!(Ok(MSG), x.read_gotten(0)); - - // silent round - assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE - assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); - } - }, - &|x| { - // B - x.configure(PDL, b"sync").unwrap(); - x.bind_port(0, Active(addrs[1])).unwrap(); - x.bind_port(1, Native).unwrap(); - x.connect(timeout).unwrap(); - - for _ in 0..N { - // silent round - assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE - assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); - - // get msg round - x.get(0).unwrap(); - assert_eq!(Ok(0), x.sync(timeout)); // GET ONE - assert_eq!(Ok(MSG), x.read_gotten(0)); - } - }, - ])); -} - #[test] fn alternator_2() { // Test a deterministic system which @@ -646,66 +586,94 @@ fn exchange() { } #[test] -fn filter_messages() { - // Make a protocol whose behavior depends on the contents of messages - // Getter prohibits the receipt of messages of the form [0, ...]. - // those messages are silent +fn routing_filter() { + // Make a protocol whose behavior is a function of the contents of + // a message. Here, the putter determines what is sent, and the proto + // determines how it is routed /* - Sender -->forward-->P|A-->forward_nonzero--> Receiver + Sender -->filter-->P|A-->sync--> Receiver */ let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; - const N: usize = 1; + const N: usize = 10; assert!(run_connector_set(&[ // &|x| { // Sender - x.configure(PDL, b"forward").unwrap(); + x.configure(PDL, b"filter").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(2, Native).unwrap(); // err channel x.connect(timeout).unwrap(); for i in (0..3).cycle().take(N) { - let msg = vec![i as u8]; // messages [0], [1], [2], [0], [1] ... - // batches: [{0=>*}, {0=>?}] + // messages cycle [], [4], [4,4], ... + let msg: Payload = std::iter::repeat(4).take(i).collect(); + + // batch 0: passes through filter! + x.put(0, msg.clone()).unwrap(); x.next_batch().unwrap(); + + // batch 1: gets returned! x.put(0, msg.clone()).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); + x.get(1).unwrap(); match x.sync(timeout).unwrap() { - 0 => { - // not sent - assert_eq!(&msg, &[0u8]); - } - 1 => { - // sent - assert_ne!(&msg, &[0u8]); - } + 0 => assert_ne!(msg.len(), 0), // ok + 1 => assert_eq!(msg.len(), 0), // err _ => unreachable!(), } } }, &|x| { // Receiver - x.configure(PDL, b"forward_nonzero").unwrap(); + x.configure(PDL, b"sync").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); for _ in 0..N { - // round _i batches:[0=>*, 0=>?] + // empty batch x.next_batch().unwrap(); + + // got a message x.get(0).unwrap(); match x.sync(timeout).unwrap() { - 0 => { - // nothing received - assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); - } - 1 => { - // msg received - assert_ne!(&[0u8], x.read_gotten(0).unwrap()); - } + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => assert_ne!(Ok(&[] as &[u8]), x.read_gotten(0)), _ => unreachable!(), } } }, ])); } + +#[test] +fn fifo_1_e() { + /* + /-->\ + Alice fifo_1 + \<--/ + */ + let timeout = Duration::from_millis(1_500); + const N: usize = 10; + assert!(run_connector_set(&[ + // + &|x| { + // Alice + x.configure(PDL, b"fifo_1_e").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + // put + assert_eq!(Ok(()), x.put(0, b"message~".to_vec())); + assert_eq!(Ok(0), x.sync(timeout)); + + // get + assert_eq!(Ok(()), x.get(1)); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(&[] as &[u8]), x.read_gotten(1)); + } + }, + ])); +}