Changeset - ce6bcc0a0c26
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-02-06 11:32:43
christopheresterhuyse@gmail.com
more examples: protocol alternator, natives alternating
7 files changed with 256 insertions and 33 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -276,7 +276,14 @@ impl Controller {
 
        log!(&mut self.inner.logger, "No decision yet. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?;
 
            let received = self.recv(deadline)?.ok_or_else(|| {
 
                log!(
 
                    &mut self.inner.logger,
 
                    ":( timing out. Solutions storage in state... {:#?}",
 
                    &self.ephemeral.solution_storage
 
                );
 
                SyncErr::Timeout
 
            })?;
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received);
 
@@ -331,7 +338,6 @@ impl Controller {
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 

	
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
src/runtime/connector.rs
Show inline comments
 
@@ -180,7 +180,7 @@ impl Connector {
 
        }
 
        let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?");
 
        let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidntGet)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
    }
 
}
src/runtime/errors.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PortBindErr {
 
    AlreadyConnected,
 
    IndexOutOfBounds,
 
@@ -8,22 +8,22 @@ pub enum PortBindErr {
 
    ParseErr,
 
    AlreadyConfigured,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum ReadGottenErr {
 
    NotConnected,
 
    IndexOutOfBounds,
 
    WrongPolarity,
 
    NoPreviousRound,
 
    DidntGet,
 
    DidNotGet,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PortOpErr {
 
    IndexOutOfBounds,
 
    NotConnected,
 
    WrongPolarity,
 
    DuplicateOperation,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Clone, PartialEq, Eq)]
 
pub enum ConfigErr {
 
    AlreadyConnected,
 
    ParseErr(String),
 
@@ -31,7 +31,7 @@ pub enum ConfigErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum ConnectErr {
 
    PortNotBound { native_index: usize },
 
    NotConfigured,
 
@@ -47,19 +47,19 @@ pub enum ConnectErr {
 
    PassiveConnectFailed(SocketAddr),
 
    BindFailed(SocketAddr),
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PollDeadlineErr {
 
    PollingFailed,
 
    Timeout,
 
}
 

	
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum EndpointErr {
 
    Disconnected,
 
    MetaProtocolDeviation,
 
}
 

	
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncErr {
 
    NotConnected,
 
    MessengerRecvErr(MessengerRecvErr),
 
@@ -72,11 +72,11 @@ pub enum SyncErr {
 
    EndpointErr(EndpointErr),
 
    EvalErr(EvalErr),
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum EvalErr {
 
    ComponentExitWhileBranching,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum MessengerRecvErr {
 
    PollingFailed,
 
    EndpointErr(EndpointErr),
src/runtime/mod.rs
Show inline comments
 
@@ -172,7 +172,7 @@ struct BranchPContext<'m, 'r> {
 
    inbox: &'r HashMap<Key, Payload>,
 
}
 

	
 
#[derive(Debug, Default)]
 
#[derive(Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
@@ -229,7 +229,16 @@ trait Messengerlike {
 
}
 

	
 
/////////////////////////////////
 

	
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
        for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
            let sols = &self.subtree_solutions[index];
 
            f.write_fmt(format_args!("{:?} => {:?}, ", subtree_id, sols))?;
 
        }
 
        f.pad("]")
 
    }
 
}
 
impl From<EvalErr> for SyncErr {
 
    fn from(e: EvalErr) -> SyncErr {
 
        SyncErr::EvalErr(e)
src/runtime/setup.rs
Show inline comments
 
@@ -402,7 +402,8 @@ impl Controller {
 
                parent, major
 
            ),
 
        }
 
        log!(logger, "{:?} DONE WITH ECHO", major);
 

	
 
        log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader);
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
src/test/connector.rs
Show inline comments
 
@@ -23,7 +23,6 @@ fn next_addr() -> SocketAddr {
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    static PDL: &[u8] = b"";
 
    let handles = vec![
 
        thread::spawn(move || {
 
            let controller_id = 0;
 
@@ -199,24 +198,27 @@ fn duo_negative() {
 
    handle(b.join());
 
}
 

	
 
static FORWARD: &[u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}";
 

	
 
#[test]
 
fn connect_natives() {
 
    static CHAIN: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {}
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            x.configure(CHAIN, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(CHAIN, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
@@ -227,18 +229,12 @@ fn connect_natives() {
 

	
 
#[test]
 
fn forward() {
 
    static FORWARD: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {
 
            put(o, get(i));
 
        }
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(FORWARD, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
@@ -248,7 +244,7 @@ fn forward() {
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(FORWARD, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
@@ -260,3 +256,214 @@ fn forward() {
 
        },
 
    ]);
 
}
 

	
 
static SYNC: &[u8] = b"
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}";
 
#[test]
 
fn native_alt() {
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 3;
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HI".to_vec();
 
            for _i in 0..N {
 
                // round _i*2: batches: [0=>*]
 
                assert_eq!(0, x.sync(timeout).unwrap());
 

	
 
                // round _i*2+1: batches: [0=>HI]
 
                x.put(0, msg.clone()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HI".to_vec();
 
            for _i in 0..(2 * N) {
 
                // round _i batches:[0=>*, 0=>HI]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]);
 
}
 

	
 
static ALTERNATOR_2: &[u8] = b"
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}";
 

	
 
#[test]
 
fn alternator_2() {
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(ALTERNATOR_2, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, b"hey".to_vec()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static PARITY_ROUTER: &[u8] = b"
 
primitive parity_router(in i, out odd, out even) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        if (m[0]%2==0) {
 
            put(even, m);
 
        } else {
 
            put(odd, m);
 
        }
 
    }
 
}";
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn parity_router() {
 
    /*                    /--|-->Getsodd
 
    Sender -->parity_router
 
                          \--|-->Getseven
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PARITY_ROUTER, b"parity_router").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for i in 0..N {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], ...
 
                x.put(0, msg).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Getsodd
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 1); // assert msg is odd
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
        &|x| {
 
            // Getseven
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 0); // assert msg is even
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]);
 
}
src/test/mod.rs
Show inline comments
 
@@ -49,7 +49,7 @@ fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) {
 
    for ((controller_id, connector), res) in
 
        cid_iter.zip(connectors.iter_mut()).zip(results.into_iter())
 
    {
 
        println!("====================\n CID {:?} ...", controller_id);
 
        println!("\n\n====================\n CID {:?} ...", controller_id);
 
        match connector.get_mut_logger() {
 
            Some(logger) => println!("{}", logger),
 
            None => println!("<No Log>"),
0 comments (0 inline, 0 general)