Changeset - 49b9c766fe5a
[Not reviewed]
3 5 3
Christopher Esterhuyse - 5 years ago 2020-02-10 11:35:57
christopheresterhuyse@gmail.com
tracking new channel ekeys
11 files changed with 124 insertions and 76 deletions:
0 comments (0 inline, 0 general)
examples/0_forward/alice
Show inline comments
 
new file 100755
 
binary diff not shown
examples/0_forward/alice.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../check.c"
 

	
 
int main() {
 
	// ALICE
 
	
 
	char* pdl ="\
 
	primitive forward(in i, out o) {\
 
		while(true) synchronous {\
 
			put(o, get(i));\
 
		}\
 
	}";
 
int main() { // ALICE
 
	
 
	char* pdl =
 
	"primitive forward(in i, out o) {"
 
	"	while(true) synchronous {"
 
	"		put(o, get(i));"
 
	"	}"
 
	"}"
 
	;
 
	
 
	char msg_buf[128];
 
	memset(msg_buf, 0, 128);
 
	
 
	printf("input a message to send:");
 
	if (fgets(msg_buf, 128-1, stdin) == NULL) {
 
		printf("LINE READ BAD\n");
 
		return 1;
 
	}
 

	
 
	check("fgets", fgets(msg_buf, 128-1, stdin) == NULL);
 
	int msg_len = strlen(msg_buf);
 
	msg_buf[msg_len-1] = 0;
 
	printf("sending msg `%s`\n", msg_buf);
 
	
 
	Connector* c = connector_new();
 
	if (connector_configure(c, pdl, "forward")) {
 
		printf("CONFIG FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	if (connector_bind_native(c, 0)) {
 
		printf("BIND0 FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	if (connector_bind_passive(c, 1, "127.0.0.1:7000")) {
 
		printf("BIND1 FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	printf("connecting... \n");
 
	if (connector_connect(c, 10000)) {
 
		printf("CONNECT FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	check("connect", connector_connect(c, 10000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		if (connector_put(c, 0, msg_buf, msg_len)) {
 
			printf("CONNECT PUT: %s\n", connector_error_peek());
 
			return 1;
 
		}
 
		if (connector_sync(c, 10000)) {
 
			printf("SYNC FAILED: %s\n", connector_error_peek());
 
			return 1;
 
		}
 
		check("put ", connector_put(c, 0, msg_buf, msg_len));
 
		check("sync", connector_sync(c, 10000));
 
		printf("SEND OK\n");
 
	}
 
	
 
	printf("OK\n");
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/0_forward/alice.exe
Show inline comments
 
deleted file
 
binary diff not shown
examples/0_forward/bob
Show inline comments
 
new file 100755
 
binary diff not shown
examples/0_forward/bob.c
Show inline comments
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../check.c"
 

	
 
int main() {
 
	
 
	char* pdl ="\
 
	primitive forward(in i, out o) {\
 
		while(true) synchronous {\
 
			put(o, get(i));\
 
		}\
 
	}";
 
	
 
	// BOB
 
	Connector* c = connector_new();
 
	if (connector_configure(c, pdl, "forward")) {
 
		printf("CONFIG FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	if (connector_bind_active(c, 0, "127.0.0.1:7000")) {
 
		printf("BIND0 FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	if (connector_bind_native(c, 1)) {
 
		printf("BIND1 FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	printf("connecting... \n");
 
	if (connector_connect(c, 10000)) {
 
		printf("CONNECT FAILED: %s\n", connector_error_peek());
 
		return 1;
 
	}
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	check("connect", connector_connect(c, 10000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		if (connector_get(c, 0)) {
 
			printf("CONNECT GET: %s\n", connector_error_peek());
 
			return 1;
 
		}
 
		if (connector_sync(c, 10000)) {
 
			printf("SYNC FAILED: %s\n", connector_error_peek());
 
			return 1;
 
		}
 
		check("get ", connector_get(c, 0));
 
		check("sync", connector_sync(c, 10000));
 

	
 
		int msg_len;
 
		const unsigned char * msg;
 
		if (connector_gotten(c, 0, &msg, &msg_len)) {
 
			printf("READ FAILED: %s\n", connector_error_peek());
 
			return 1;
 
		}
 
		check("read", connector_gotten(c, 0, &msg, &msg_len));
 

	
 
		printf("received: `%s`\n", msg);
 
	}
 
	
 
	printf("OK\n");
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/0_forward/bob.exe
Show inline comments
 
deleted file
 
binary diff not shown
examples/0_forward/make.sh
Show inline comments
 
modified file chmod 100644 => 100755
 
echo "MAKING!"
 
gcc main.c -L ../../target/release -lreowolf_rs -o main
 
echo "DONE!"
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH alice.c -o alice
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH bob.c -o bob
examples/0_forward/reowolf_rs.dll
Show inline comments
 
deleted file
 
binary diff not shown
examples/check.c
Show inline comments
 
new file 100644
 
void check(const char* phase, int err) {
 
	if (err) {
 
		printf("ERR %d in phase `%s`. Err was `%s`\nEXITING!\n",
 
			err, phase, connector_error_peek());
 
		exit(1);
 
	}
 
}
 
\ No newline at end of file
src/runtime/communication.rs
Show inline comments
 
@@ -331,331 +331,333 @@ impl Controller {
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { oracle } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &oracle
 
                    );
 
                    return self.end_round_with_decision(oracle);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let channel_id = self
 
                                .inner
 
                                .endpoint_exts
 
                                .get(received.recipient)
 
                                .expect("UEHFU")
 
                                .info
 
                                .channel_id;
 
                            if payload_predicate.query(channel_id) != Some(true) {
 
                                // sender didn't preserve the invariant
 
                                return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                            }
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
 
                                solution_storage: &mut self.ephemeral.solution_storage,
 
                            };
 
                            use SyncRunResult as Srr;
 
                            let blocker = poly_p.poly_recv_run(
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
 
                                payload_predicate,
 
                                payload,
 
                            )?;
 
                            log!(
 
                                &mut self.inner.logger,
 
                                "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                subtree_id,
 
                                blocker
 
                            );
 
                            match blocker {
 
                                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                }
 
            },
 
            ekeys: self.ekeys,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        } else {
 
            panic!("MachineP attempting to move alien ekey!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 
        let kp = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: a,
 
        });
 
        let kg = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: b,
 
        });
 
        self.ekeys.insert(kp);
 
        self.ekeys.insert(kg);
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
 

	
 
impl SolutionStorage {
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut String,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut String,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let val = self.inbox.get(&ekey);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
src/test/connector.rs
Show inline comments
 
extern crate test_generator;
 

	
 
use super::*;
 

	
 
use crate::common::*;
 
use crate::runtime::{errors::*, PortBinding::*};
 

	
 
static PDL: &[u8] = b"
 
primitive blocked(in i, out o) {
 
    while(true) synchronous {}
 
}
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}
 
primitive fifo_1(in i, out o) {
 
    msg holding = null;
 
    while(true) synchronous {
 
        if (holding == null && fires(i)) {
 
            holding = get(i);
 
        } else if (holding != null && fires(o)) {
 
            put(o, holding);
 
            holding = null;
 
        }
 
    }
 
}
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}
 
composite forward_pair(in ia, out oa, in ib, out ob) {
 
    new forward(ia, oa);
 
    new forward(ib, ob);
 
}
 
primitive forward_nonzero(in i, out o) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        assert(m[0] != 0);
 
        put(o, m);
 
    }
 
}
 
primitive token_spout(out o) {
 
    while(true) synchronous {
 
        put(o, create(0));
 
    }
 
}
 
primitive wait_10(out o) {
 
    int i = 0;
 
    while(i < 10) {
 
        synchronous {}
 
        i += 1;
 
    }
 
primitive wait_n(int to_wait, out o) {
 
    while(to_wait > 0) synchronous() to_wait -= 1;
 
    synchronous { put(o, create(0)); }
 
}
 
composite wait_10(out o) {
 
    new wait_n(10, o);
 
}
 
";
 

	
 
#[test]
 
fn connects_ok() {
 
    // Test if we can connect natives using the given PDL
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connected_but_silent_natives() {
 
    // Test if we can connect natives and have a trivial sync round
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn self_forward_ok() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Echo!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 
#[test]
 
fn token_spout_ok() {
 
    // Test a deterministic system where the proto
 
    // creates token messages
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 5;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"token_spout").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn waiter_ok() {
 
    // Test a stateful proto that blocks port 0 for 10 rounds
 
    // and then sends a single token on the 11th
 
    /*
 
    Alice<--token_spout
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"wait_10").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..10 {
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
            x.get(0).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
            assert_eq!(Ok(&[] as &[u8]), x.read_gotten(0));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn self_forward_timeout() {
 
    // Test a deterministic system
 
    // where a native has no network bindings
 
    // and sends messages to itself
 
    /*
 
        /-->\
 
    Alice   forward
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(500);
 
    static MSG: &[u8] = b"Echo!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            x.put(0, MSG.to_vec()).unwrap();
 
            // native and forward components cannot find a solution
 
            assert_eq!(Err(SyncErr::Timeout), x.sync(timeout));
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn forward_det() {
 
    // Test if a deterministic protocol and natives can pass one message
 
    /*
 
    Alice -->forward--P|A-->forward--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hello!";
 

	
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn nondet_proto_det_natives() {
 
    // Test the use of a nondeterministic protocol
 
    // where Alice decides the choice and the others conform
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Message, here!";
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn putter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 3;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>*}, {0=>?}]
 
                x.get(0).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn getter_determines() {
 
    // putter and getter
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"Hidey ho!";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _i in 0..N {
 
                // batches [{0=>?}, {0=>*}]
 
                x.put(0, MSG.to_vec()).unwrap();
 
                x.next_batch().unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _i in 0..N {
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn fifo_2() {
 
    // Test a deterministic system which
 
    // alternates sending Sender's messages to A or B
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"message";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn alternator_2() {
 
    // Test a deterministic system which
 
    // alternates sending Sender's messages to A or B
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    static MSG: &[u8] = b"message";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, MSG.to_vec()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"Hippity Hoppity";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1605
 
fn exchange() {
 
    /*
 
        /-->forward-->P|A-->forward-->\
 
    Alice                             Bob
 
        \<--forward<--P|A<--forward<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(2, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"A->B".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(2, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"B->A".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn filter_messages() {
 
    // Make a protocol whose behavior depends on the contents of messages
 
    // Getter prohibits the receipt of messages of the form [0, ...].
 
    // those messages are silent
 
    /*
 
    Sender -->forward-->P|A-->forward_nonzero--> Receiver
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for i in (0..3).cycle().take(N) {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], [0], [1] ...
 
                                         // batches: [{0=>*}, {0=>?}]
 
                x.next_batch().unwrap();
 
                x.put(0, msg.clone()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
                match x.sync(timeout).unwrap() {
 
                    0 => {
 
                        // not sent
 
                        assert_eq!(&msg, &[0u8]);
 
                    }
 
                    1 => {
 
                        // sent
 
                        assert_ne!(&msg, &[0u8]);
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
0 comments (0 inline, 0 general)