From ce6bcc0a0c261a3824b462e02598867a95ef03db 2020-02-06 11:32:43 From: Christopher Esterhuyse Date: 2020-02-06 11:32:43 Subject: [PATCH] more examples: protocol alternator, natives alternating --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 4f02976bf9624fd2bfc64dba565a5de0b56d3698..7782a5552e063b79ef960d104ff76a3cbd1ac8c6 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -276,7 +276,14 @@ impl Controller { log!(&mut self.inner.logger, "No decision yet. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { - let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?; + let received = self.recv(deadline)?.ok_or_else(|| { + log!( + &mut self.inner.logger, + ":( timing out. Solutions storage in state... {:#?}", + &self.ephemeral.solution_storage + ); + SyncErr::Timeout + })?; let current_content = match received.msg { Msg::SetupMsg(_) => { log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received); @@ -331,7 +338,6 @@ impl Controller { subtree_id, partial_oracle, ); - if self.handle_locals_maybe_decide()? { return Ok(()); } diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index cf1c0fc59c38d0d073179b5a5c5cffbdb095b30d..3c892b5852b23354adfc89b5249ce03ae638ec21 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -180,7 +180,7 @@ impl Connector { } let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?"); let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?; - let payload = result.1.get(&key).ok_or(DidntGet)?; + let payload = result.1.get(&key).ok_or(DidNotGet)?; Ok(payload) } } diff --git a/src/runtime/errors.rs b/src/runtime/errors.rs index 3c3c228f73efad91deaba0cccfb9b30f7b64fe2c..f245a07ec047eebe0389499cbfa006d3f364a667 100644 --- a/src/runtime/errors.rs +++ b/src/runtime/errors.rs @@ -1,6 +1,6 @@ use crate::common::*; -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum PortBindErr { AlreadyConnected, IndexOutOfBounds, @@ -8,22 +8,22 @@ pub enum PortBindErr { ParseErr, AlreadyConfigured, } -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ReadGottenErr { NotConnected, IndexOutOfBounds, WrongPolarity, NoPreviousRound, - DidntGet, + DidNotGet, } -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum PortOpErr { IndexOutOfBounds, NotConnected, WrongPolarity, DuplicateOperation, } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum ConfigErr { AlreadyConnected, ParseErr(String), @@ -31,7 +31,7 @@ pub enum ConfigErr { NoSuchComponent, NonPortTypeParameters, } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ConnectErr { PortNotBound { native_index: usize }, NotConfigured, @@ -47,19 +47,19 @@ pub enum ConnectErr { PassiveConnectFailed(SocketAddr), BindFailed(SocketAddr), } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum PollDeadlineErr { PollingFailed, Timeout, } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum EndpointErr { Disconnected, MetaProtocolDeviation, } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum SyncErr { NotConnected, MessengerRecvErr(MessengerRecvErr), @@ -72,11 +72,11 @@ pub enum SyncErr { EndpointErr(EndpointErr), EvalErr(EvalErr), } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum EvalErr { ComponentExitWhileBranching, } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MessengerRecvErr { PollingFailed, EndpointErr(EndpointErr), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index c2a47243997c608cba4fc4e3da050b4f935022b3..9916289f2e3a4cee65066324688a763eeeba31a8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -172,7 +172,7 @@ struct BranchPContext<'m, 'r> { inbox: &'r HashMap, } -#[derive(Debug, Default)] +#[derive(Default)] pub(crate) struct SolutionStorage { old_local: HashSet, new_local: HashSet, @@ -229,7 +229,16 @@ trait Messengerlike { } ///////////////////////////////// - +impl Debug for SolutionStorage { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.pad("Solutions: [")?; + for (subtree_id, &index) in self.subtree_id_to_index.iter() { + let sols = &self.subtree_solutions[index]; + f.write_fmt(format_args!("{:?} => {:?}, ", subtree_id, sols))?; + } + f.pad("]") + } +} impl From for SyncErr { fn from(e: EvalErr) -> SyncErr { SyncErr::EvalErr(e) diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 358766d9ae2ec5369f0233d30621017af2aabcf6..e17a14087898ccb8efc6d03946a66a7428e30a6b 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -402,7 +402,8 @@ impl Controller { parent, major ), } - log!(logger, "{:?} DONE WITH ECHO", major); + + log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader); // 3. broadcast leader announcement (except to parent: confirm they are your parent) // in this loop, every node sends 1 message to each neighbor diff --git a/src/test/connector.rs b/src/test/connector.rs index 27dfb30d9c2452278900d666875116f08cbd5b24..9be87cb4124c7845b584d1e713a161cf41c19920 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -23,7 +23,6 @@ fn next_addr() -> SocketAddr { fn incremental() { let timeout = Duration::from_millis(1_500); let addrs = [next_addr(), next_addr()]; - static PDL: &[u8] = b""; let handles = vec![ thread::spawn(move || { let controller_id = 0; @@ -199,24 +198,27 @@ fn duo_negative() { handle(b.join()); } +static FORWARD: &[u8] = b" +primitive forward(in i, out o) { + while(true) synchronous { + put(o, get(i)); + } +}"; + #[test] fn connect_natives() { - static CHAIN: &[u8] = b" - primitive main(in i, out o) { - while(true) synchronous {} - }"; let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; do_all(&[ &|x| { - x.configure(CHAIN, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); }, &|x| { - x.configure(CHAIN, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); @@ -227,18 +229,12 @@ fn connect_natives() { #[test] fn forward() { - static FORWARD: &[u8] = b" - primitive main(in i, out o) { - while(true) synchronous { - put(o, get(i)); - } - }"; let timeout = Duration::from_millis(1_500); let addrs = [next_addr()]; do_all(&[ // &|x| { - x.configure(FORWARD, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Native).unwrap(); x.bind_port(1, Passive(addrs[0])).unwrap(); x.connect(timeout).unwrap(); @@ -248,7 +244,7 @@ fn forward() { assert_eq!(0, x.sync(timeout).unwrap()); }, &|x| { - x.configure(FORWARD, b"main").unwrap(); + x.configure(FORWARD, b"forward").unwrap(); x.bind_port(0, Active(addrs[0])).unwrap(); x.bind_port(1, Native).unwrap(); x.connect(timeout).unwrap(); @@ -260,3 +256,214 @@ fn forward() { }, ]); } + +static SYNC: &[u8] = b" +primitive sync(in i, out o) { + while(true) synchronous { + if (fires(i)) put(o, get(i)); + } +}"; +#[test] +fn native_alt() { + /* + Alice -->sync--A|P-->sync--> Bob + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + const N: usize = 3; + do_all(&[ + // + &|x| { + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Active(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + + let msg = b"HI".to_vec(); + for _i in 0..N { + // round _i*2: batches: [0=>*] + assert_eq!(0, x.sync(timeout).unwrap()); + + // round _i*2+1: batches: [0=>HI] + x.put(0, msg.clone()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + }, + &|x| { + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + let expect = b"HI".to_vec(); + for _i in 0..(2 * N) { + // round _i batches:[0=>*, 0=>HI] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)), + _ => unreachable!(), + } + } + }, + ]); +} + +static ALTERNATOR_2: &[u8] = b" +primitive alternator_2(in i, out a, out b) { + while(true) { + synchronous { put(a, get(i)); } + synchronous { put(b, get(i)); } + } +}"; + +#[test] +fn alternator_2() { + /* /--|-->A + Sender -->alternator_2 + \--|-->B + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr(), next_addr()]; + const N: usize = 5; + do_all(&[ + // + &|x| { + // Sender + x.configure(ALTERNATOR_2, b"alternator_2").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(2, Passive(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + for _ in 0..2 { + x.put(0, b"hey".to_vec()).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + } + }, + &|x| { + // A + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + let expecting: &[u8] = b"hey"; + + for _ in 0..N { + // get msg round + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); // GET ONE + assert_eq!(Ok(expecting), x.read_gotten(0)); + + // silent round + assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + } + }, + &|x| { + // B + x.configure(SYNC, b"sync").unwrap(); + x.bind_port(0, Active(addrs[1])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + let expecting: &[u8] = b"hey"; + + for _ in 0..N { + // silent round + assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE + assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)); + + // get msg round + x.get(0).unwrap(); + assert_eq!(Ok(0), x.sync(timeout)); // GET ONE + assert_eq!(Ok(expecting), x.read_gotten(0)); + } + }, + ]); +} + +static PARITY_ROUTER: &[u8] = b" +primitive parity_router(in i, out odd, out even) { + while(true) synchronous { + msg m = get(i); + if (m[0]%2==0) { + put(even, m); + } else { + put(odd, m); + } + } +}"; + +#[test] +// THIS DOES NOT YET WORK. TODOS are hit +fn parity_router() { + /* /--|-->Getsodd + Sender -->parity_router + \--|-->Getseven + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr(), next_addr()]; + const N: usize = 1; + do_all(&[ + // + &|x| { + // Sender + x.configure(PARITY_ROUTER, b"parity_router").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.bind_port(2, Passive(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + + for i in 0..N { + let msg = vec![i as u8]; // messages [0], [1], [2], ... + x.put(0, msg).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + } + }, + &|x| { + // Getsodd + x.configure(FORWARD, b"forward").unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + // round _i batches:[0=>*, 0=>?] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => { + let msg = x.read_gotten(0).unwrap(); + assert!(msg[0] % 2 == 1); // assert msg is odd + } + _ => unreachable!(), + } + } + }, + &|x| { + // Getseven + x.configure(FORWARD, b"forward").unwrap(); + x.bind_port(0, Active(addrs[1])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + for _ in 0..N { + // round _i batches:[0=>*, 0=>?] + x.next_batch().unwrap(); + x.get(0).unwrap(); + match x.sync(timeout).unwrap() { + 0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)), + 1 => { + let msg = x.read_gotten(0).unwrap(); + assert!(msg[0] % 2 == 0); // assert msg is even + } + _ => unreachable!(), + } + } + }, + ]); +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 329b07c7c7edcaede8ca5f0f7b1be29f588454b3..fd552a007526257f304470c5b89aa71aa215b4bd 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -49,7 +49,7 @@ fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) { for ((controller_id, connector), res) in cid_iter.zip(connectors.iter_mut()).zip(results.into_iter()) { - println!("====================\n CID {:?} ...", controller_id); + println!("\n\n====================\n CID {:?} ...", controller_id); match connector.get_mut_logger() { Some(logger) => println!("{}", logger), None => println!(""),