diff --git a/src/test/connector.rs b/src/test/connector.rs index 27dfb30d9c2452278900d666875116f08cbd5b24..9be87cb4124c7845b584d1e713a161cf41c19920 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -23,7 +23,6 @@ fn next_addr() -> SocketAddr { fn incremental() { let timeout = Duration::from_millis(1_500); let addrs = [next_addr(), next_addr()]; - static PDL: &[u8] = b""; let handles = vec![ thread::spawn(move || { let controller_id = 0; @@ -199,24 +198,27 @@ fn duo_negative() { handle(b.join()); } +static FORWARD: &[u8] = b" +primitive forward(in i, out o) { + while(true) synchronous { + put(o, get(i)); + } +}"; + #[test] fn connect_natives() { - static CHAIN: &[u8] = b" - primitive main(in i, out o) { - while(true) synchronous {} - }"; let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; do_all(&[ &|x| { - x.configure(CHAIN, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); }, &|x| { - x.configure(CHAIN, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); @@ -227,18 +229,12 @@ fn connect_natives() { #[test] fn forward() { - static FORWARD: &[u8] = b" - primitive main(in i, out o) { - while(true) synchronous { - put(o, get(i)); - } - }"; let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; do_all(&[ // &|x| { - x.configure(FORWARD, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); @@ -248,7 +244,7 @@ fn forward() { assert_eq!(0, x.sync(timeout).unwrap()); }, &|x| { - x.configure(FORWARD, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); @@ -260,3 +256,214 @@ fn forward() { }, ]); } + +static SYNC: &[u8] = b" +primitive sync(in i, out o) { + while(true) synchronous { + if (fires(i)) put(o, get(i)); + } +}"; +#[test] +fn native_alt() { + /* + Alice -->sync--A|P-->sync--> Bob + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + const N: usize = 3; + do_all(&[ + // + &|x| { + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Active(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + + let msg = b"HI".to_vec(); + for _i in 0..N { + // round _i*2: batches: [0=>*] + assert_eq!(0, x.sync(timeout).unwrap()); + + // round _i*2+1: batches: [0=>HI] + x.put(0, msg.clone()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + }, + &|x| { + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + let expect = b"HI".to_vec(); + for _i in 0..(2 * N) { + // round _i batches:[0=>*, 0=>HI] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)), + _ => unreachable!(), + } + } + }, + ]); +} + +static ALTERNATOR_2: &[u8] = b" +primitive alternator_2(in i, out a, out b) { + while(true) { + synchronous { put(a, get(i)); } + synchronous { put(b, get(i)); } + } +}"; + +#[test] +fn alternator_2() { + /* /--|-->A + Sender -->alternator_2 + \--|-->B + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr(), next_addr()]; + const N: usize = 5; + do_all(&[ + // + &|x| { + // Sender + x.configure(ALTERNATOR_2, b"alternator_2").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(2, Passive(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + for _ in 0..2 { + x.put(0, b"hey".to_vec()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + } + }, + &|x| { + // A + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + let expecting: &[u8] = b"hey"; + + for _ in 0..N { + // get msg round + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); // GET ONE + assert_eq!(Ok(expecting), x.read_gotten(0)); + + // silent round + assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + } + }, + &|x| { + // B + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Active(addrs[1])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + let expecting: &[u8] = b"hey"; + + for _ in 0..N { + // silent round + assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + + // get msg round + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); // GET ONE + assert_eq!(Ok(expecting), x.read_gotten(0)); + } + }, + ]); +} + +static PARITY_ROUTER: &[u8] = b" +primitive parity_router(in i, out odd, out even) { + while(true) synchronous { + msg m = get(i); + if (m[0]%2==0) { + put(even, m); + } else { + put(odd, m); + } + } +}"; + +#[test] +// THIS DOES NOT YET WORK. TODOS are hit +fn parity_router() { + /* /--|-->Getsodd + Sender -->parity_router + \--|-->Getseven + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr(), next_addr()]; + const N: usize = 1; + do_all(&[ + // + &|x| { + // Sender + x.configure(PARITY_ROUTER, b"parity_router").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(2, Passive(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + + for i in 0..N { + let msg = vec![i as u8]; // messages [0], [1], [2], ... + x.put(0, msg).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + }, + &|x| { + // Getsodd + x.configure(FORWARD, b"forward").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + // round _i batches:[0=>*, 0=>?] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => { + let msg = x.read_gotten(0).unwrap(); + assert!(msg[0] % 2 == 1); // assert msg is odd + } + _ => unreachable!(), + } + } + }, + &|x| { + // Getseven + x.configure(FORWARD, b"forward").unwrap(); + x.bind_port(0, Active(addrs[1])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + // round _i batches:[0=>*, 0=>?] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => { + let msg = x.read_gotten(0).unwrap(); + assert!(msg[0] % 2 == 0); // assert msg is even + } + _ => unreachable!(), + } + } + }, + ]); +}