Changeset - 33da6b69e9a2
[Not reviewed]
examples/0_forward/alice
Show inline comments
 
deleted file
 
binary diff not shown
examples/0_forward/bob
Show inline comments
 
deleted file
 
binary diff not shown
examples/0_three_forward/amy.c
Show inline comments
 
file renamed from examples/0_forward/alice.c to examples/0_three_forward/amy.c
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../check.c"
 
#include "../utility.c"
 

	
 
int main() { // ALICE
 
int main() { // AMY
 
	
 
	char* pdl =
 
	"primitive forward(in i, out o) {"
 
	"	while(true) synchronous {"
 
	"		put(o, get(i));"
 
	"	}"
 
	"}"
 
	;
 
	
 
	char msg_buf[128];
 
	memset(msg_buf, 0, 128);
 
	
 
	printf("input a message to send:");
 

	
 
	check("fgets", fgets(msg_buf, 128-1, stdin) == NULL);
 
	int msg_len = strlen(msg_buf);
 
	msg_buf[msg_len-1] = 0;
 
	printf("sending msg `%s`\n", msg_buf);
 
	printf("will send msg `%s`\n", msg_buf);
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	check("connect", connector_connect(c, 10000));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("put ", connector_put(c, 0, msg_buf, msg_len));
 
		check("sync", connector_sync(c, 10000));
 
		printf("SEND OK\n");
 
		check("sync", connector_sync(c, 1000));
 
		printf("Sent one message!\n");
 
	}
 
	
 
	printf("OK\n");
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	return 0;
 
}
 
\ No newline at end of file
examples/0_three_forward/amy.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/0_three_forward/bob.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // BOB!
 
	
 
	char* pdl =
 
	"primitive forward(in i, out o) {"
 
	"	while(true) synchronous {"
 
	"		put(o, get(i));"
 
	"	}"
 
	"}"
 
	;
 
	
 
	// BOB
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("get ", connector_get(c, 0));
 
		check("sync", connector_sync(c, 1000));
 

	
 
		int msg_len;
 
		const unsigned char * msg;
 
		check("read", connector_gotten(c, 0, &msg, &msg_len));
 

	
 
		printf("Received one message `%s`!\n", msg);
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	return 0;
 
}
 
\ No newline at end of file
examples/0_three_forward/bob.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/0_three_forward/make.sh
Show inline comments
 
new file 100644
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o ./amy
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH bob.c -o ./bob
examples/1_load_pdl/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // AMY
 
	char * pdl = buffer_pdl("forward.pdl");
 
	
 
	char msg_buf[128];
 
	memset(msg_buf, 0, 128);
 
	
 
	printf("input a message to send:");
 

	
 
	check("fgets", fgets(msg_buf, 128-1, stdin) == NULL);
 
	int msg_len = strlen(msg_buf);
 
	msg_buf[msg_len-1] = 0;
 
	printf("will send msg `%s`\n", msg_buf);
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_passive(c, 1, "127.0.0.1:7000"));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("put ", connector_put(c, 0, msg_buf, msg_len));
 
		check("sync", connector_sync(c, 1000));
 
		printf("Sent one message!\n");
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/1_load_pdl/amy.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/1_load_pdl/bob.c
Show inline comments
 
file renamed from examples/0_forward/bob.c to examples/1_load_pdl/bob.c
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../check.c"
 
#include "../utility.c"
 

	
 
int main() {
 
	
 
	char* pdl ="\
 
	primitive forward(in i, out o) {\
 
		while(true) synchronous {\
 
			put(o, get(i));\
 
		}\
 
	}";
 
int main() { // BOB!
 
	char * pdl = buffer_pdl("forward.pdl");
 
	
 
	// BOB
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward"));
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	check("connect", connector_connect(c, 10000));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 
	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		check("get ", connector_get(c, 0));
 
		check("sync", connector_sync(c, 10000));
 
		check("sync", connector_sync(c, 1000));
 

	
 
		int msg_len;
 
		const unsigned char * msg;
 
		check("read", connector_gotten(c, 0, &msg, &msg_len));
 

	
 
		printf("received: `%s`\n", msg);
 
		printf("Received one message `%s`!\n", msg);
 
	}
 
	
 
	printf("OK\n");
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/1_load_pdl/bob.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/1_load_pdl/make.sh
Show inline comments
 
modified file chmod 100755 => 100644
 
file renamed from examples/0_forward/make.sh to examples/1_load_pdl/make.sh
 
#!/bin/sh
 

	
 
LIB_PATH="../../target/release"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH alice.c -o alice
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o amy
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH bob.c -o bob
examples/2_atomic_swap/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // AMY
 
	char * pdl = buffer_pdl("swap.pdl");
 
	
 
	Connector* c = connector_new();
 
	printf("configuring...\n");
 

	
 
	check("config ", connector_configure(c, pdl, "forward_two"));
 
	check("bind 0 ", connector_bind_native(c, 0));
 
	check("bind 1 ", connector_bind_native(c, 1));
 
	check("bind 2 ", connector_bind_passive(c, 2, "127.0.0.1:7000"));
 
	check("bind 3 ", connector_bind_passive(c, 3, "127.0.0.1:7001"));
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 

	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		printf("\nround %d\n", i);
 
		
 
		check("put ", connector_put(c, 0, "one", 3));
 
		check("put ", connector_put(c, 1, "two", 3));
 
		check("sync", connector_sync(c, 1000));
 
		
 
		printf("Sent both messages!\n");
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/2_atomic_swap/amy.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/2_atomic_swap/bob.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main() { // BOB!
 
	char * pdl = buffer_pdl("forward.pdl");
 
	Connector* c = connector_new();
 

	
 
	printf("configuring...\n");
 
	check("config ", connector_configure(c, pdl, "forward_two"));
 

	
 
	check("bind 0 ", connector_bind_active(c, 0, "127.0.0.1:7000"));
 
	check("bind 1 ", connector_bind_active(c, 1, "127.0.0.1:7001"));
 
	check("bind 2 ", connector_bind_native(c, 2));
 
	check("bind 3 ", connector_bind_native(c, 3));
 

	
 
	printf("connecting...\n");
 
	check("connect", connector_connect(c, 5000));
 

	
 
	int msg_len;
 
	const unsigned char * msg;
 

	
 
	int i;
 
	for (i = 0; i < 3; i++) {
 
		printf("\nround %d\n", i);
 
		
 
		check("get ", connector_get(c, 0));
 
		check("get ", connector_get(c, 1));
 
		check("sync", connector_sync(c, 1000));
 
		
 
		check("read one", connector_gotten(c, 0, &msg, &msg_len));
 
		printf("Got message one: `%.*s`\n", msg_len, msg);
 
		
 
		check("read two", connector_gotten(c, 1, &msg, &msg_len));
 
		printf("Got message two: `%.*s`\n", msg_len, msg);
 
	}
 
	
 
	printf("destroying...\n");
 
	connector_destroy(c);
 
	printf("exiting...\n");
 
	free(pdl);
 
	return 0;
 
}
 
\ No newline at end of file
examples/2_atomic_swap/bob.exe
Show inline comments
 
new file 100644
 
binary diff not shown
examples/2_atomic_swap/make.sh
Show inline comments
 
new file 100644
 
#!/bin/sh
 

	
 
LIB_PATH="../"
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH amy.c -o amy
 
gcc -L $LIB_PATH -lreowolf_rs -Wl,-R$LIB_PATH bob.c -o bob
examples/REMINDER dll is a symlink.txt
Show inline comments
 
new file 100644
examples/check.c
Show inline comments
 
deleted file
examples/forward.pdl
Show inline comments
 
new file 100644
 
primitive forward(in i, out o) {
 
	while(true) synchronous {
 
		put(o, get(i));
 
	}
 
}
 

	
 
primitive forward_two(in ia, in ib, out oa, out ob) {
 
	while(true) synchronous {
 
		put(oa, get(ia));
 
		put(ob, get(ib));
 
	}
 
}
 

	
examples/reowolf_rs.dll
Show inline comments
 
new file 120000
 
../target/release/reowolf_rs.dll
 
\ No newline at end of file
examples/utility.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <errno.h>
 

	
 
void check(const char* phase, int err) {
 
	if (err) {
 
		printf("ERR %d in phase `%s`. Err was `%s`\nEXITING!\n",
 
			err, phase, connector_error_peek());
 
		exit(1);
 
	}
 
}
 

	
 
// allocates a buffer!
 
char * buffer_pdl(char * filename) {
 
	FILE *f = fopen("forward.pdl", "rb");
 
	if (f == NULL) {
 
		printf("Opening pdl file returned errno %d!\n", errno);
 
		exit(1);
 
	}
 
	fseek(f, 0, SEEK_END);
 
	long fsize = ftell(f);
 
	fseek(f, 0, SEEK_SET);
 
	char *pdl = malloc(fsize + 1);
 
	fread(pdl, 1, fsize, f);
 
	fclose(f);
 
	pdl[fsize] = 0;
 
	return pdl;
 
}
 
\ No newline at end of file
huang
Show inline comments
 
deleted file
src/runtime/actors.rs
Show inline comments
 
@@ -44,196 +44,214 @@ impl PolyP {
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        let to_run: Vec<_> = self.incomplete.drain().collect();
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn poly_run_these_branches(
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
            use PolyBlocker as Sb;
 
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
 
            log!(
 
                &mut r_ctx.m_ctx.inner.logger,
 
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        r_ctx.m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        branch.blocking_on = Some(ekey);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
 
                    log!(
 
                        logger,
 
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                    );
 
                    // come up with the predicate for this local solution
 

	
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
                                    // This branch should have fired but didn't!
 
                                    log!(
 
                                        logger,
 
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
 
                                        channel_id,
 
                                    );
 
                                    continue 'to_run_loop;
 
                                }
 
                            }
 
                            Some(false) => assert!(!fired),
 
                            Some(false) => {
 
                                if fired {
 
                                    println!(
 
                                        "pred {:#?} in {:#?} out {:#?}",
 
                                        &predicate,
 
                                        branch.inbox.get(ekey),
 
                                        branch.outbox.get(ekey)
 
                                    );
 
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id)
 
                                }
 
                            }
 
                            None => {
 
                                predicate.replace_assignment(channel_id, false);
 
                                assert!(!fired)
 
                                if fired {
 
                                    println!(
 
                                        "pred {:#?} in {:#?} out {:#?}",
 
                                        &predicate,
 
                                        branch.inbox.get(ekey),
 
                                        branch.outbox.get(ekey)
 
                                    );
 
                                    panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id)
 
                                }
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "~ ... ... and finished just fine!",);
 
                    m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut m_ctx.inner.logger,
 
                        m_ctx.my_subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    self.complete.insert(predicate, branch);
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        branch.outbox.insert(ekey, payload.clone());
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
                        }
 
                        .into_msg(m_ctx.inner.round_index);
 
                        log!(
 
                            &mut m_ctx.inner.logger,
 
                            "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!",
 
                            &msg,
 
                            ekey,
 
                            (info.channel_id.controller_id, info.channel_id.channel_index),
 
                        );
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
 
        Ok(if self.incomplete.is_empty() {
 
            if self.complete.is_empty() {
 
                Srr::NoBranches
 
            } else {
 
                Srr::AllBranchesComplete
 
            }
 
        } else {
 
            Srr::BlockingForRecv
 
        })
 
    }
 

	
 
    pub(crate) fn poly_recv_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        // try exact match
 

	
 
        let to_run = if self.complete.contains_key(&payload_predicate) {
 
            // exact match with stopped machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
 
            );
 
            vec![]
 
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
 
            // exact match with running machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            if branch.blocking_on == Some(ekey) {
 
                branch.blocking_on = None;
 
                vec![(payload_predicate, branch)]
 
            } else {
 
                vec![]
 
            }
 
        } else {
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
src/test/connector.rs
Show inline comments
 
@@ -516,192 +516,236 @@ fn connector_composite_chain_a() {
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_composite_chain_b() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"SSS";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_exchange() {
 
    /*
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_both() {
 
    /* ------->   -----P|A---->   ------->
 
      / /--->\ \ / /---P|A-->\ \ / /--->\ \
 
    Alice    exchange       exchange     Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in a
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out a
 
            x.bind_port(2, Native).unwrap(); // native in b
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer out b
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"one".to_vec()));
 
                assert_eq!(Ok(()), x.put(1, b"two".to_vec()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap(); // peer in a
 
            x.bind_port(1, Native).unwrap(); // native out b
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer in b
 
            x.bind_port(3, Native).unwrap(); // native out a
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.get(0));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"one" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"two" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_routing_filter() {
 
    // Make a protocol whose behavior is a function of the contents of
 
    // a message. Here, the putter determines what is sent, and the proto
 
    // determines how it is routed
 
    /*
 
    Sender -->filter-->P|A-->sync--> Receiver
 
    */
 
    let timeout = Duration::from_millis(3_000);
 
    let addrs = [next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"filter").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Native).unwrap(); // err channel
 
            x.connect(timeout).unwrap();
 

	
 
            for i in (0..3).cycle().take(N) {
 
                // messages cycle [], [4], [4,4], ...
 
                let msg: Payload = std::iter::repeat(4).take(i).collect();
 

	
 
                // batch 0: passes through filter!
 
                x.put(0, msg.clone()).unwrap();
 
                x.next_batch().unwrap();
 

	
 
                // batch 1: gets returned!
 
                x.put(0, msg.clone()).unwrap();
 
                x.get(1).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_ne!(msg.len(), 0), // ok
 
                    1 => assert_eq!(msg.len(), 0), // err
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
        &|x| {
 
            // Receiver
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // empty batch
 
                x.next_batch().unwrap();
 

	
 
                // got a message
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_ne!(Ok(&[] as &[u8]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_fifo_1_e() {
 
    /*
 
        /-->\
 
    Alice   fifo_1
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 10;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"fifo_1_e").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // put
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 

	
 
                // get
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"message~" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn connector_causal_loop() {
0 comments (0 inline, 0 general)