Changeset - 2982ea49738a
[Not reviewed]
MH - 4 years ago 2021-11-12 14:05:56
contact@maxhenger.nl
rename 'synchronous' statement to 'sync'
45 files changed with 113 insertions and 113 deletions:
0 comments (0 inline, 0 general)
examples/bench_04/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_05/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, port_pairs, proto_components;
 
	port_pairs = atoi(argv[1]);
 
	proto_components = atoi(argv[2]);
 
	printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_5.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<port_pairs; i++) {
 
		connector_add_port_pair(c, NULL, NULL);
 
	}
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_09/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive presync_work() {   "
 
	"    int i = 0;               "
 
	"    while(true) {            "
 
	"        i = 0;               "
 
	"        while(i < 2)  i++;   "
 
	"        synchronous {}       "
 
	"        sync {}       "
 
	"    }                        "
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "presync_work";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_11/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j, forwards, num_options, correct_index;
 
	forwards = atoi(argv[1]);
 
	num_options = atoi(argv[2]);
 
	printf("forwards %d, num_options %d\n",
 
		forwards, num_options);
 
	unsigned char pdl[] = 
 
	"primitive recv_zero(in a) {  "
 
	"    while(true) synchronous {"
 
	"    while(true) sync {"
 
	"        msg m = get(a);      "
 
	"        assert(m[0] == 0);   "
 
	"    }                        "
 
	"}                            "
 
	; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_11.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		// create a forward to tail of chain
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		// thread a forward component onto native_tail
 
		char ident[] = "forward";
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	// add "recv_zero" on end of chain
 
	char ident[] = "recv_zero";
 
	connector_add_component(c, ident, sizeof(ident)-1, &native_getter, 1);
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	char msg = 0;
 
	for (i=0; i<1000; i++) {
 
		correct_index = i%num_options;
 
		for(j=0; j<num_options; j++) {
 
			msg = j==correct_index ? 0 : 1;
 
			connector_put_bytes(c, native_putter, &msg, 1);
 
			if(j+1 < num_options) {
 
				connector_next_batch(c);
 
			}
 
		}	
 
		connector_sync(c, -1);	
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_23/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i;
 

	
 
	// unsigned char pdl[] = "\
 
	// primitive xrouter(in a, out b, out c) {\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            if(fires(a)) {\
 
 //                if(fires(b)) put(b, get(a));\
 
 //                else         put(c, get(a));\
 
 //            }\
 
 //        }\
 
 //    }"
 
 //    ;
 
	unsigned char pdl[] = "\
 
	primitive lossy(in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                msg m = get(a);\
 
                if(fires(b)) put(b, m);\
 
            }\
 
        }\
 
    }\
 
    primitive sync_drain(in a, in b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                get(a);\
 
                get(b);\
 
            }\
 
        }\
 
    }\
 
    composite xrouter(in a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        channel p -> q;\
 
        channel r -> s;\
 
        channel t -> u;\
 
        new replicator(a, d, f);\
 
        new replicator(g, t, h);\
 
        new lossy(e, l);\
 
        new lossy(i, j);\
 
        new replicator(m, b, p);\
 
        new replicator(k, n, c);\
 
        new merger(q, o, r);\
 
        new sync_drain(u, s);\
 
    }"
 
    ;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, 0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	PortId ports[6];
 
	for(i=0; i<3; i++) {
 
		connector_add_port_pair(c, &ports[2*i], &ports[2*i+1]);
 
	}
 
	// [native~~~~~~~~~~]
 
	//  0  1  2  3  4  5
 
	//  |  ^  |  ^  |  ^  
 
	//  `--`  `--`  `--`  
 
	char ident[] = "xrouter";
 
	connector_add_component(
 
		c,
 
		ident,
 
		sizeof(ident)-1,
 
		(PortId[]) { ports[1], ports[2], ports[4] },
 
		3);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	// [native~~~~~~~~~~]
 
	//  0        3     5
 
	//  V        ^     ^  
 
	//  1        2     4  
 
	// [xrouter~~~~~~~~~]
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	int msg_len = 1000;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 

	
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[3]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("First: %f\n", time_spent);
 
	}
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[5]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("Second: %f\n", time_spent);
 
	}
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[3 + (i%2)*2]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("Alternating: %f\n", time_spent);
 
	}
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_24/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j;
 

	
 
	unsigned char pdl[] = "\
 
	primitive fifo1_init(msg m, in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(m != null && fires(b)) {\
 
                put(b, m);\
 
                m = null;\
 
            } else if (m == null && fires(a)) {\
 
                m = get(a);\
 
            }\
 
        }\
 
    }\
 
    composite fifo1_full(in a, out b) {\
 
        new fifo1_init(create(0), a, b);\
 
    }\
 
    composite fifo1(in a, out b) {\
 
        new fifo1_init(null, a, b);\
 
    }\
 
    composite sequencer3(out a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        new fifo1_full(o, d);\
 
        new replicator(e, f, a);\
 
        new fifo1(g, h);\
 
        new replicator(i, j, b);\
 
        new fifo1(k, l);\
 
        new replicator(m, n, c);\
 
    }"
 
    ;
 
	// unsigned char pdl[] = "\
 
	// primitive sequencer3(out a, out b, out c) {\
 
 //        int i = 0;\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            out to = a;\
 
 //            if     (i==1) to = b;\
 
 //            else if(i==2) to = c;\
 
 //            if(fires(to)) {\
 
 //                put(to, create(0));\
 
 //                i = (i + 1)%3;\
 
 //            }\
 
 //        }\
 
 //    }"
 
    ;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, 0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	PortId putters[3], getters[3];
 
	for(i=0; i<3; i++) {
 
		connector_add_port_pair(c, &putters[i], &getters[i]);
 
	}
 
	char ident[] = "sequencer3";
 
	connector_add_component(c, ident, sizeof(ident)-1, putters, 3);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 

	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000/3; i++) {
 
		for (j=0; j<3; j++) {
 
			connector_get(c, getters[j]);
 
			connector_sync(c, -1);
 
		}
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_27/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "\
 
    primitive xrouter(in a, out b, out c) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                if(fires(b)) put(b, get(a));\
 
                else         put(c, get(a));\
 
            }\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[8];
 
    if(optimized=='y') {
 
        connector_add_port_pair(c, &ports[0], &ports[1]);
 
        connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized
 
        connector_add_component(c, "sync", 4, ports+1, 2);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    } else {
 
        for(i=0; i<4; i++) {
 
            connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
        }
 
        connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        connector_put_bytes(c, ports[0], msg, msg_len);
 
        connector_get(c, ports[7]);
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
  sync {
 
    put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 

	
 
primitive alt_round_merger(in a, in b, out c){
 
  while(true) {
 
    synchronous{ put(c, get(a)); }
 
    synchronous{ put(c, get(b)); }
 
    sync { put(c, get(a)); }
 
    sync { put(c, get(b)); }
 
  }	
 
}
src/protocol/parser/token_parsing.rs
Show inline comments
 
use crate::collections::ScopedSection;
 
use crate::protocol::ast::*;
 
use crate::protocol::input_source::{
 
    InputSource as InputSource,
 
    InputPosition as InputPosition,
 
    InputSpan,
 
    ParseError,
 
};
 
use super::tokens::*;
 
use super::symbol_table::*;
 
use super::{Module, PassCtx};
 

	
 
// Keywords
 
pub(crate) const KW_LET:       &'static [u8] = b"let";
 
pub(crate) const KW_AS:        &'static [u8] = b"as";
 
pub(crate) const KW_STRUCT:    &'static [u8] = b"struct";
 
pub(crate) const KW_ENUM:      &'static [u8] = b"enum";
 
pub(crate) const KW_UNION:     &'static [u8] = b"union";
 
pub(crate) const KW_FUNCTION:  &'static [u8] = b"func";
 
pub(crate) const KW_PRIMITIVE: &'static [u8] = b"primitive";
 
pub(crate) const KW_COMPOSITE: &'static [u8] = b"composite";
 
pub(crate) const KW_IMPORT:    &'static [u8] = b"import";
 

	
 
// Keywords - literals
 
pub(crate) const KW_LIT_TRUE:  &'static [u8] = b"true";
 
pub(crate) const KW_LIT_FALSE: &'static [u8] = b"false";
 
pub(crate) const KW_LIT_NULL:  &'static [u8] = b"null";
 

	
 
// Keywords - function(like)s
 
pub(crate) const KW_CAST:        &'static [u8] = b"cast";
 
pub(crate) const KW_FUNC_GET:    &'static [u8] = b"get";
 
pub(crate) const KW_FUNC_PUT:    &'static [u8] = b"put";
 
pub(crate) const KW_FUNC_FIRES:  &'static [u8] = b"fires";
 
pub(crate) const KW_FUNC_CREATE: &'static [u8] = b"create";
 
pub(crate) const KW_FUNC_LENGTH: &'static [u8] = b"length";
 
pub(crate) const KW_FUNC_ASSERT: &'static [u8] = b"assert";
 
pub(crate) const KW_FUNC_PRINT:  &'static [u8] = b"print";
 

	
 
// Keywords - statements
 
pub(crate) const KW_STMT_CHANNEL:  &'static [u8] = b"channel";
 
pub(crate) const KW_STMT_IF:       &'static [u8] = b"if";
 
pub(crate) const KW_STMT_ELSE:     &'static [u8] = b"else";
 
pub(crate) const KW_STMT_WHILE:    &'static [u8] = b"while";
 
pub(crate) const KW_STMT_BREAK:    &'static [u8] = b"break";
 
pub(crate) const KW_STMT_CONTINUE: &'static [u8] = b"continue";
 
pub(crate) const KW_STMT_GOTO:     &'static [u8] = b"goto";
 
pub(crate) const KW_STMT_RETURN:   &'static [u8] = b"return";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"synchronous";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"sync";
 
pub(crate) const KW_STMT_NEW:      &'static [u8] = b"new";
 

	
 
// Keywords - types
 
// Since types are needed for returning diagnostic information to the user, the
 
// string variants are put here as well.
 
pub(crate) const KW_TYPE_IN_PORT_STR:  &'static str = "in";
 
pub(crate) const KW_TYPE_OUT_PORT_STR: &'static str = "out";
 
pub(crate) const KW_TYPE_MESSAGE_STR:  &'static str = "msg";
 
pub(crate) const KW_TYPE_BOOL_STR:     &'static str = "bool";
 
pub(crate) const KW_TYPE_UINT8_STR:    &'static str = "u8";
 
pub(crate) const KW_TYPE_UINT16_STR:   &'static str = "u16";
 
pub(crate) const KW_TYPE_UINT32_STR:   &'static str = "u32";
 
pub(crate) const KW_TYPE_UINT64_STR:   &'static str = "u64";
 
pub(crate) const KW_TYPE_SINT8_STR:    &'static str = "s8";
 
pub(crate) const KW_TYPE_SINT16_STR:   &'static str = "s16";
 
pub(crate) const KW_TYPE_SINT32_STR:   &'static str = "s32";
 
pub(crate) const KW_TYPE_SINT64_STR:   &'static str = "s64";
 
pub(crate) const KW_TYPE_CHAR_STR:     &'static str = "char";
 
pub(crate) const KW_TYPE_STRING_STR:   &'static str = "string";
 
pub(crate) const KW_TYPE_INFERRED_STR: &'static str = "auto";
 

	
 
pub(crate) const KW_TYPE_IN_PORT:  &'static [u8] = KW_TYPE_IN_PORT_STR.as_bytes();
 
pub(crate) const KW_TYPE_OUT_PORT: &'static [u8] = KW_TYPE_OUT_PORT_STR.as_bytes();
 
pub(crate) const KW_TYPE_MESSAGE:  &'static [u8] = KW_TYPE_MESSAGE_STR.as_bytes();
 
pub(crate) const KW_TYPE_BOOL:     &'static [u8] = KW_TYPE_BOOL_STR.as_bytes();
 
pub(crate) const KW_TYPE_UINT8:    &'static [u8] = KW_TYPE_UINT8_STR.as_bytes();
 
pub(crate) const KW_TYPE_UINT16:   &'static [u8] = KW_TYPE_UINT16_STR.as_bytes();
 
pub(crate) const KW_TYPE_UINT32:   &'static [u8] = KW_TYPE_UINT32_STR.as_bytes();
 
pub(crate) const KW_TYPE_UINT64:   &'static [u8] = KW_TYPE_UINT64_STR.as_bytes();
 
pub(crate) const KW_TYPE_SINT8:    &'static [u8] = KW_TYPE_SINT8_STR.as_bytes();
 
pub(crate) const KW_TYPE_SINT16:   &'static [u8] = KW_TYPE_SINT16_STR.as_bytes();
 
pub(crate) const KW_TYPE_SINT32:   &'static [u8] = KW_TYPE_SINT32_STR.as_bytes();
 
pub(crate) const KW_TYPE_SINT64:   &'static [u8] = KW_TYPE_SINT64_STR.as_bytes();
 
pub(crate) const KW_TYPE_CHAR:     &'static [u8] = KW_TYPE_CHAR_STR.as_bytes();
 
pub(crate) const KW_TYPE_STRING:   &'static [u8] = KW_TYPE_STRING_STR.as_bytes();
 
pub(crate) const KW_TYPE_INFERRED: &'static [u8] = KW_TYPE_INFERRED_STR.as_bytes();
 

	
 
/// A special trait for when consuming comma-separated things such that we can
 
/// push them onto a `Vec` and onto a `ScopedSection`. As we monomorph for
 
/// very specific comma-separated cases I don't expect polymorph bloat.
 
/// Also, I really don't like this solution.
 
pub(crate) trait Extendable {
 
    type Value;
 

	
 
    fn push(&mut self, v: Self::Value);
 
}
 

	
 
impl<T> Extendable for Vec<T> {
 
    type Value = T;
 

	
 
    #[inline]
 
    fn push(&mut self, v: Self::Value) {
 
        (self as &mut Vec<T>).push(v);
 
    }
 
}
 

	
 
impl<T: Sized> Extendable for ScopedSection<T> {
 
    type Value = T;
 

	
 
    #[inline]
 
    fn push(&mut self, v: Self::Value) {
 
        (self as &mut ScopedSection<T>).push(v);
 
    }
 
}
 

	
 
/// Consumes a domain-name identifier: identifiers separated by a dot. For
 
/// simplification of later parsing and span identification the domain-name may
 
/// contain whitespace, but must reside on the same line.
 
pub(crate) fn consume_domain_ident<'a>(
 
    source: &'a InputSource, iter: &mut TokenIter
 
) -> Result<(&'a [u8], InputSpan), ParseError> {
 
    let (_, mut span) = consume_ident(source, iter)?;
 
    while let Some(TokenKind::Dot) = iter.next() {
 
        iter.consume();
 
        let (_, new_span) = consume_ident(source, iter)?;
 
        span.end = new_span.end;
 
    }
 

	
 
    // Not strictly necessary, but probably a reasonable restriction: this
 
    // simplifies parsing of module naming and imports.
 
    if span.begin.line != span.end.line {
 
        return Err(ParseError::new_error_str_at_span(source, span, "module names may not span multiple lines"));
 
    }
 

	
 
    // If module name consists of a single identifier, then it may not match any
 
    // of the reserved keywords
 
    let section = source.section_at_pos(span.begin, span.end);
 
    if is_reserved_keyword(section) {
 
        return Err(ParseError::new_error_str_at_span(source, span, "encountered reserved keyword"));
 
    }
 

	
 
    Ok((source.section_at_pos(span.begin, span.end), span))
 
}
 

	
 
/// Consumes a specific expected token. Be careful to only call this with tokens
 
/// that do not have a variable length.
 
pub(crate) fn consume_token(source: &InputSource, iter: &mut TokenIter, expected: TokenKind) -> Result<InputSpan, ParseError> {
 
    if Some(expected) != iter.next() {
 
        return Err(ParseError::new_error_at_pos(
 
            source, iter.last_valid_pos(),
 
            format!("expected '{}'", expected.token_chars())
 
        ));
 
    }
 
    let span = iter.next_span();
 
    iter.consume();
 
    Ok(span)
 
}
 

	
 
/// Consumes a comma separated list until the closing delimiter is encountered
 
pub(crate) fn consume_comma_separated_until<T, F, E>(
 
    close_delim: TokenKind, source: &InputSource, iter: &mut TokenIter, ctx: &mut PassCtx,
 
    mut consumer_fn: F, target: &mut E, item_name_and_article: &'static str,
 
    close_pos: Option<&mut InputPosition>
 
) -> Result<(), ParseError>
 
    where F: FnMut(&InputSource, &mut TokenIter, &mut PassCtx) -> Result<T, ParseError>,
 
          E: Extendable<Value=T>
 
{
 
    let mut had_comma = true;
 
    let mut next;
 
    loop {
 
        next = iter.next();
 
        if Some(close_delim) == next {
 
            if let Some(close_pos) = close_pos {
 
                // If requested return the position of the closing delimiter
 
                let (_, new_close_pos) = iter.next_positions();
 
                *close_pos = new_close_pos;
 
            }
 
            iter.consume();
 
            break;
 
        } else if !had_comma || next.is_none() {
 
            return Err(ParseError::new_error_at_pos(
 
                source, iter.last_valid_pos(),
 
                format!("expected a '{}', or {}", close_delim.token_chars(), item_name_and_article)
 
            ));
 
        }
 

	
 
        let new_item = consumer_fn(source, iter, ctx)?;
 
        target.push(new_item);
 

	
 
        next = iter.next();
 
        had_comma = next == Some(TokenKind::Comma);
 
        if had_comma {
 
            iter.consume();
 
        }
 
    }
 

	
 
    Ok(())
 
}
 

	
 
/// Consumes a comma-separated list of items if the opening delimiting token is
 
/// encountered. If not, then the iterator will remain at its current position.
 
/// Note that the potential cases may be:
 
/// - No opening delimiter encountered, then we return `false`.
 
/// - Both opening and closing delimiter encountered, but no items.
 
/// - Opening and closing delimiter encountered, and items were processed.
 
/// - Found an opening delimiter, but processing an item failed.
 
pub(crate) fn maybe_consume_comma_separated<T, F, E>(
 
    open_delim: TokenKind, close_delim: TokenKind, source: &InputSource, iter: &mut TokenIter, ctx: &mut PassCtx,
 
    consumer_fn: F, target: &mut E, item_name_and_article: &'static str,
 
    close_pos: Option<&mut InputPosition>
 
) -> Result<bool, ParseError>
 
    where F: FnMut(&InputSource, &mut TokenIter, &mut PassCtx) -> Result<T, ParseError>,
 
          E: Extendable<Value=T>
 
{
 
    if Some(open_delim) != iter.next() {
 
        return Ok(false);
 
    }
 

	
 
    // Opening delimiter encountered, so must parse the comma-separated list.
 
    iter.consume();
 
    consume_comma_separated_until(close_delim, source, iter, ctx, consumer_fn, target, item_name_and_article, close_pos)?;
 

	
 
    Ok(true)
 
}
 

	
 
pub(crate) fn maybe_consume_comma_separated_spilled<F: FnMut(&InputSource, &mut TokenIter, &mut PassCtx) -> Result<(), ParseError>>(
 
    open_delim: TokenKind, close_delim: TokenKind, source: &InputSource,
 
    iter: &mut TokenIter, ctx: &mut PassCtx,
 
    mut consumer_fn: F, item_name_and_article: &'static str
 
) -> Result<bool, ParseError> {
 
    let mut next = iter.next();
 
    if Some(open_delim) != next {
 
        return Ok(false);
 
    }
 

	
 
    iter.consume();
 
    let mut had_comma = true;
 
    loop {
 
        next = iter.next();
 
        if Some(close_delim) == next {
 
            iter.consume();
 
            break;
 
        } else if !had_comma {
 
            return Err(ParseError::new_error_at_pos(
 
                source, iter.last_valid_pos(),
 
                format!("expected a '{}', or {}", close_delim.token_chars(), item_name_and_article)
 
            ));
 
        }
 

	
 
        consumer_fn(source, iter, ctx)?;
 
        next = iter.next();
 
        had_comma = next == Some(TokenKind::Comma);
 
        if had_comma {
 
            iter.consume();
 
        }
 
    }
 

	
 
    Ok(true)
 
}
 

	
 
/// Consumes a comma-separated list and expected the opening and closing
 
/// characters to be present. The returned array may still be empty
 
pub(crate) fn consume_comma_separated<T, F, E>(
 
    open_delim: TokenKind, close_delim: TokenKind, source: &InputSource,
 
    iter: &mut TokenIter, ctx: &mut PassCtx,
 
    consumer_fn: F, target: &mut E, item_name_and_article: &'static str,
 
    list_name_and_article: &'static str, close_pos: Option<&mut InputPosition>
 
) -> Result<(), ParseError>
 
    where F: FnMut(&InputSource, &mut TokenIter, &mut PassCtx) -> Result<T, ParseError>,
 
          E: Extendable<Value=T>
 
{
 
    let first_pos = iter.last_valid_pos();
 
    match maybe_consume_comma_separated(
 
        open_delim, close_delim, source, iter, ctx, consumer_fn, target,
 
        item_name_and_article, close_pos
 
    ) {
 
        Ok(true) => Ok(()),
 
        Ok(false) => {
 
            return Err(ParseError::new_error_at_pos(
 
                source, first_pos,
 
                format!("expected {}", list_name_and_article)
 
            ));
 
        },
 
        Err(err) => Err(err)
 
    }
 
}
 

	
 
/// Consumes an integer literal, may be binary, octal, hexadecimal or decimal,
 
/// and may have separating '_'-characters.
 
/// TODO: @Cleanup, @Performance
 
pub(crate) fn consume_integer_literal(source: &InputSource, iter: &mut TokenIter, buffer: &mut String) -> Result<(u64, InputSpan), ParseError> {
 
    if Some(TokenKind::Integer) != iter.next() {
 
        return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected an integer literal"));
 
    }
 
    let integer_span = iter.next_span();
 
    iter.consume();
 

	
 
    let integer_text = source.section_at_span(integer_span);
 

	
 
    // Determine radix and offset from prefix
 
    let (radix, input_offset, radix_name) =
 
        if integer_text.starts_with(b"0b") || integer_text.starts_with(b"0B") {
 
            // Binary number
 
            (2, 2, "binary")
 
        } else if integer_text.starts_with(b"0o") || integer_text.starts_with(b"0O") {
 
            // Octal number
 
            (8, 2, "octal")
 
        } else if integer_text.starts_with(b"0x") || integer_text.starts_with(b"0X") {
 
            // Hexadecimal number
 
            (16, 2, "hexadecimal")
 
        } else {
 
            (10, 0, "decimal")
 
        };
 

	
 
    // Take out any of the separating '_' characters
 
    buffer.clear();
 
    for char_idx in input_offset..integer_text.len() {
 
        let char = integer_text[char_idx];
 
        if char == b'_' {
 
            continue;
 
        }
 

	
 
        if !((char >= b'0' && char <= b'9') || (char >= b'A' && char <= b'F') || (char >= b'a' || char <= b'f')) {
 
            return Err(ParseError::new_error_at_span(
 
                source, integer_span,
 
                format!("incorrectly formatted {} number", radix_name)
 
            ));
 
        }
 
        buffer.push(char::from(char));
 
    }
 

	
 
    // Use the cleaned up string to convert to integer
 
    match u64::from_str_radix(&buffer, radix) {
 
        Ok(number) => Ok((number, integer_span)),
 
        Err(_) => Err(ParseError::new_error_at_span(
 
            source, integer_span,
 
            format!("incorrectly formatted {} number", radix_name)
 
        )),
 
    }
 
}
 

	
 
/// Consumes a character literal. We currently support a limited number of
 
/// backslash-escaped characters
 
pub(crate) fn consume_character_literal(
 
    source: &InputSource, iter: &mut TokenIter
 
) -> Result<(char, InputSpan), ParseError> {
 
    if Some(TokenKind::Character) != iter.next() {
 
        return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a character literal"));
 
    }
 
    let span = iter.next_span();
 
    iter.consume();
 

	
 
    let char_text = source.section_at_span(span);
 
    if !char_text.is_ascii() {
 
        return Err(ParseError::new_error_str_at_span(
 
            source, span, "expected an ASCII character literal"
 
        ));
 
    }
 

	
 
    match char_text.len() {
 
        0 => return Err(ParseError::new_error_str_at_span(source, span, "too little characters in character literal")),
 
        1 => {
 
            // We already know the text is ascii, so just throw an error if we have the escape
 
            // character.
 
            if char_text[0] == b'\\' {
 
                return Err(ParseError::new_error_str_at_span(source, span, "escape character without subsequent character"));
 
            }
 
            return Ok((char_text[0] as char, span));
 
        },
 
        2 => {
 
            if char_text[0] == b'\\' {
 
                let result = parse_escaped_character(source, span, char_text[1])?;
 
                return Ok((result, span))
 
            }
 
        },
 
        _ => {}
 
    }
 

	
 
    return Err(ParseError::new_error_str_at_span(source, span, "too many characters in character literal"))
 
}
 

	
 
/// Consumes a string literal. We currently support a limited number of
 
/// backslash-escaped characters. Note that the result is stored in the
 
/// buffer.
 
pub(crate) fn consume_string_literal(
 
    source: &InputSource, iter: &mut TokenIter, buffer: &mut String
 
) -> Result<InputSpan, ParseError> {
 
    if Some(TokenKind::String) != iter.next() {
 
        return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a string literal"));
 
    }
 

	
 
    buffer.clear();
 
    let span = iter.next_span();
 
    iter.consume();
 

	
 
    let text = source.section_at_span(span);
 
    if !text.is_ascii() {
 
        return Err(ParseError::new_error_str_at_span(source, span, "expected an ASCII string literal"));
 
    }
 

	
 
    debug_assert_eq!(text[0], b'"'); // here as kind of a reminder: the span includes the bounding quotation marks
 
    debug_assert_eq!(text[text.len() - 1], b'"');
 

	
 
    buffer.reserve(text.len() - 2);
 

	
 
    let mut was_escape = false;
 
    for idx in 1..text.len() - 1 {
 
        let cur = text[idx];
 
        let is_escape = cur == b'\\';
 
        if was_escape {
 
            let to_push = parse_escaped_character(source, span, cur)?;
 
            buffer.push(to_push);
 
        } else {
 
            buffer.push(cur as char);
 
        }
 

	
 
        if was_escape && is_escape {
 
            was_escape = false;
 
        } else {
 
            was_escape = is_escape;
 
        }
 
    }
 

	
 
    debug_assert!(!was_escape); // because otherwise we couldn't have ended the string literal
 

	
 
    Ok(span)
 
}
 

	
 
fn parse_escaped_character(source: &InputSource, literal_span: InputSpan, v: u8) -> Result<char, ParseError> {
 
    let result = match v {
 
        b'r' => '\r',
 
        b'n' => '\n',
 
        b't' => '\t',
 
        b'0' => '\0',
 
        b'\\' => '\\',
 
        b'\'' => '\'',
 
        b'"' => '"',
 
        v => {
 
            let msg = if v.is_ascii_graphic() {
 
                format!("unsupported escape character '{}'", v as char)
 
            } else {
 
                format!("unsupported escape character with (unsigned) byte value {}", v)
 
            };
 
            return Err(ParseError::new_error_at_span(source, literal_span, msg))
 
        },
 
    };
 
    Ok(result)
 
}
 

	
 
pub(crate) fn consume_pragma<'a>(source: &'a InputSource, iter: &mut TokenIter) -> Result<(&'a [u8], InputPosition, InputPosition), ParseError> {
 
    if Some(TokenKind::Pragma) != iter.next() {
 
        return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a pragma"));
 
    }
 
    let (pragma_start, pragma_end) = iter.next_positions();
 
    iter.consume();
 
    Ok((source.section_at_pos(pragma_start, pragma_end), pragma_start, pragma_end))
 
}
 

	
 
pub(crate) fn has_ident(source: &InputSource, iter: &mut TokenIter, expected: &[u8]) -> bool {
 
    peek_ident(source, iter).map_or(false, |section| section == expected)
 
}
 

	
 
pub(crate) fn peek_ident<'a>(source: &'a InputSource, iter: &mut TokenIter) -> Option<&'a [u8]> {
 
    if Some(TokenKind::Ident) == iter.next() {
 
        let (start, end) = iter.next_positions();
 
        return Some(source.section_at_pos(start, end))
 
    }
 

	
 
    None
 
}
 

	
 
/// Consumes any identifier and returns it together with its span. Does not
 
/// check if the identifier is a reserved keyword.
 
pub(crate) fn consume_any_ident<'a>(
 
    source: &'a InputSource, iter: &mut TokenIter
 
) -> Result<(&'a [u8], InputSpan), ParseError> {
 
    if Some(TokenKind::Ident) != iter.next() {
 
        return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected an identifier"));
 
    }
 
    let (ident_start, ident_end) = iter.next_positions();
 
    iter.consume();
 
    Ok((source.section_at_pos(ident_start, ident_end), InputSpan::from_positions(ident_start, ident_end)))
 
}
 

	
 
/// Consumes a specific identifier. May or may not be a reserved keyword.
 
pub(crate) fn consume_exact_ident(source: &InputSource, iter: &mut TokenIter, expected: &[u8]) -> Result<InputSpan, ParseError> {
 
    let (ident, pos) = consume_any_ident(source, iter)?;
 
    if ident != expected {
 
        debug_assert!(expected.is_ascii());
 
        return Err(ParseError::new_error_at_pos(
 
            source, iter.last_valid_pos(),
 
            format!("expected the text '{}'", &String::from_utf8_lossy(expected))
 
        ));
 
    }
 
    Ok(pos)
 
}
 

	
 
/// Consumes an identifier that is not a reserved keyword and returns it
 
/// together with its span.
 
pub(crate) fn consume_ident<'a>(
 
    source: &'a InputSource, iter: &mut TokenIter
 
) -> Result<(&'a [u8], InputSpan), ParseError> {
 
    let (ident, span) = consume_any_ident(source, iter)?;
 
    if is_reserved_keyword(ident) {
 
        return Err(ParseError::new_error_str_at_span(source, span, "encountered reserved keyword"));
 
    }
 

	
 
    Ok((ident, span))
 
}
 

	
 
/// Consumes an identifier and immediately intern it into the `StringPool`
 
pub(crate) fn consume_ident_interned(
 
    source: &InputSource, iter: &mut TokenIter, ctx: &mut PassCtx
 
) -> Result<Identifier, ParseError> {
 
    let (value, span) = consume_ident(source, iter)?;
 
    let value = ctx.pool.intern(value);
 
    Ok(Identifier{ span, value })
 
}
 

	
 
fn is_reserved_definition_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_STRUCT | KW_ENUM | KW_UNION | KW_FUNCTION | KW_PRIMITIVE | KW_COMPOSITE => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_statement_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_IMPORT | KW_AS |
 
        KW_STMT_CHANNEL | KW_STMT_IF | KW_STMT_WHILE |
 
        KW_STMT_BREAK | KW_STMT_CONTINUE | KW_STMT_GOTO | KW_STMT_RETURN |
 
        KW_STMT_SYNC | KW_STMT_NEW => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_LET | KW_CAST |
 
        KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL |
 
        KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_type_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_TYPE_IN_PORT | KW_TYPE_OUT_PORT | KW_TYPE_MESSAGE | KW_TYPE_BOOL |
 
        KW_TYPE_UINT8 | KW_TYPE_UINT16 | KW_TYPE_UINT32 | KW_TYPE_UINT64 |
 
        KW_TYPE_SINT8 | KW_TYPE_SINT16 | KW_TYPE_SINT32 | KW_TYPE_SINT64 |
 
        KW_TYPE_CHAR | KW_TYPE_STRING |
 
        KW_TYPE_INFERRED => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_keyword(text: &[u8]) -> bool {
 
    return
 
        is_reserved_definition_keyword(text) ||
 
        is_reserved_statement_keyword(text) ||
 
        is_reserved_expression_keyword(text) ||
 
        is_reserved_type_keyword(text);
 
}
 

	
 
pub(crate) fn seek_module(modules: &[Module], root_id: RootId) -> Option<&Module> {
 
    for module in modules {
 
        if module.root_id == root_id {
 
            return Some(module)
 
        }
 
    }
 

	
 
    return None
 
}
 

	
 
/// Constructs a human-readable message indicating why there is a conflict of
 
/// symbols.
 
// Note: passing the `module_idx` is not strictly necessary, but will prevent
 
// programmer mistakes during development: we get a conflict because we're
 
// currently parsing a particular module.
 
pub(crate) fn construct_symbol_conflict_error(
 
    modules: &[Module], module_idx: usize, ctx: &PassCtx, new_symbol: &Symbol, old_symbol: &Symbol
 
) -> ParseError {
 
    let module = &modules[module_idx];
 
    let get_symbol_span_and_msg = |symbol: &Symbol| -> (String, Option<InputSpan>) {
 
        match &symbol.variant {
 
            SymbolVariant::Module(module) => {
 
                let import = &ctx.heap[module.introduced_at];
 
                return (
 
                    format!("the module aliased as '{}' imported here", symbol.name.as_str()),
 
                    Some(import.as_module().span)
 
                );
 
            },
 
            SymbolVariant::Definition(definition) => {
 
                if definition.defined_in_module.is_invalid() {
 
                    // Must be a builtin thing
 
                    return (format!("the builtin '{}'", symbol.name.as_str()), None)
 
                } else {
 
                    if let Some(import_id) = definition.imported_at {
 
                        let import = &ctx.heap[import_id];
 
                        return (
 
                            format!("the type '{}' imported here", symbol.name.as_str()),
 
                            Some(import.as_symbols().span)
 
                        );
 
                    } else {
 
                        // This is a defined symbol. So this must mean that the
 
                        // error was caused by it being defined.
 
                        debug_assert_eq!(definition.defined_in_module, module.root_id);
 

	
 
                        return (
 
                            format!("the type '{}' defined here", symbol.name.as_str()),
 
                            Some(definition.identifier_span)
 
                        )
 
                    }
 
                }
 
            }
 
        }
 
    };
 

	
 
    let (new_symbol_msg, new_symbol_span) = get_symbol_span_and_msg(new_symbol);
 
    let (old_symbol_msg, old_symbol_span) = get_symbol_span_and_msg(old_symbol);
 
    let new_symbol_span = new_symbol_span.unwrap(); // because new symbols cannot be builtin
 

	
 
    match old_symbol_span {
 
        Some(old_symbol_span) => ParseError::new_error_at_span(
 
            &module.source, new_symbol_span, format!("symbol is defined twice: {}", new_symbol_msg)
 
        ).with_info_at_span(
 
            &module.source, old_symbol_span, format!("it conflicts with {}", old_symbol_msg)
 
        ),
 
        None => ParseError::new_error_at_span(
 
            &module.source, new_symbol_span,
 
            format!("symbol is defined twice: {} conflicts with {}", new_symbol_msg, old_symbol_msg)
 
        )
 
    }
 
}
 
\ No newline at end of file
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const MS100: Option<Duration> = Some(Duration::from_millis(100));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir");
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).expect("Failed to create log output file!");
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, pd, connector_id)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive sync(in<msg> a, out<msg> b) {
 
primitive sync_component(in<msg> a, out<msg> b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert(!fires(a) && !fires(b));
 
            }
 
        }
 
    }
 
}
 

	
 
primitive together(in<msg> ia, in<msg> ib, out<msg> oa, out<msg> ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 
fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
    let mut v = Vec::with_capacity(cap);
 
    // Safe! len will cover owned bytes in valid state
 
    unsafe { v.set_len(cap) }
 
    v
 
}
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[i, o]).unwrap();
 
    c.add_component(b"", b"sync_component", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let test_log_path = Path::new("./logs/unconnected_gotten_err");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_no_round");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_ungotten");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let test_log_path = Path::new("./logs/next_batch");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.next_batch().unwrap_err();
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let test_log_path = Path::new("./logs/two_natives_msg");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(SEC1).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let test_log_path = Path::new("./logs/connector_pair_nondet");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(SEC1).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap_err();
 
}
 

	
 
#[test]
 
fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g, p]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g0, p1]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g0, p1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let test_log_path = Path::new("./logs/double_net_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_p, _g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [_g, _p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn distributed_msg_bounce() {
 
    /*
 
    native[0] | sync 0.p|-->|1.p native[1]
 
                     0.g|<--|1.g
 
    */
 
    let test_log_path = Path::new("./logs/distributed_msg_bounce");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            /*
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"", b"sync", &[g, p]).unwrap();
 
            c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(MS300) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let test_log_path = Path::new("./logs/parent_timeout");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let test_log_path = Path::new("./logs/child_timeout");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(MS300).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(SEC5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn net_self_loop() {
 
    let test_log_path = Path::new("./logs/net_self_loop");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn multirounds() {
 
    let test_log_path = Path::new("./logs/multirounds");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn multi_recover() {
 
    let test_log_path = Path::new("./logs/multi_recover");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let success_iter = [true, false].iter().copied().cycle().take(10);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                if succeeds {
 
                    c.get(p1).unwrap();
 
                }
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[ignore]
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[ignore]
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..15 {
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
                std::thread::sleep(MS100.unwrap());
 
            }
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_reowolf_swap() {
 
    let test_log_path = Path::new("./logs/udp_reowolf_swap");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            for _ in 0..5 {
 
                std::thread::sleep(Duration::from_millis(60));
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn example_pres_3() {
 
    let test_log_path = Path::new("./logs/example_pres_3");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            // put {A} and FAIL
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {B} and FAIL
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {A, B} and SUCCEED
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..2 {
 
                // get {A, B} and FAIL
 
                c.get(p0).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap_err();
 
            }
 
            // get {A, B} and SUCCEED
 
            c.get(p0).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn ac_not_b() {
 
    let test_log_path = Path::new("./logs/ac_not_b");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC5).unwrap();
 

	
 
            // put both A and B
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in<msg> a, in<msg> b, out<msg> c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
                sync { put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let [a, b] = c.new_port_pair();
 

	
 
            c.add_component(b"", b"ac_not_b", &[p0, p1, a]).unwrap();
 

	
 
            c.connect(SEC1).unwrap();
 

	
 
            c.get(b).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn many_rounds_net() {
 
    let test_log_path = Path::new("./logs/many_rounds_net");
 
    let sock_addrs = [next_test_addr()];
 
    const NUM_ROUNDS: usize = 1_000;
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..NUM_ROUNDS {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..NUM_ROUNDS {
 
                c.get(p0).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 
#[test]
 
fn many_rounds_mem() {
 
    let test_log_path = Path::new("./logs/many_rounds_mem");
 
    const NUM_ROUNDS: usize = 1_000;
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, p1] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    for _ in 0..NUM_ROUNDS {
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.get(p1).unwrap();
 
        c.sync(SEC1).unwrap();
 
    }
 
}
 

	
 
#[test]
 
fn pdl_reo_lossy() {
 
    let pdl = b"
 
    primitive lossy(in<msg> a, out<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg m = null;
 
            if(fires(a)) {
 
                m = get(a);
 
                if(fires(b)) {
 
                    put(b, m);
 
                }
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1() {
 
    let pdl = b"
 
    primitive fifo1(in<msg> a, out<msg> b) {
 
        msg m = null;
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
            } else {
 
                if(fires(b)) put(b, m);
 
                m = null;
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1full() {
 
    let test_log_path = Path::new("./logs/pdl_reo_fifo1full");
 
    let pdl = b"
 
    primitive fifo1full(in<msg> a, out<msg> b) {
 
        bool is_set = true;
 
        msg m = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(!is_set) {
 
                if(fires(a)) m=get(a);
 
                is_set = false;
 
            } else {
 
                if(fires(b)) put(b, m);
 
                is_set = true;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [_p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"fifo1full", &[g0, p1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(0, c.gotten(g1).unwrap().len());
 
}
 

	
 
#[test]
 
fn pdl_msg_consensus() {
 
    let test_log_path = Path::new("./logs/pdl_msg_consensus");
 
    let pdl = b"
 
    primitive msgconsensus(in<msg> a, in<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg x = get(a);
 
            msg y = get(b);
 
            assert(x == y);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"msgconsensus", &[g0, g1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap();
 

	
 
    c.put(p0, Payload::from(b"HEY" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap_err();
 
}
 

	
 
#[test]
 
fn sequencer3_prim() {
 
    let test_log_path = Path::new("./logs/sequencer3_prim");
 
    let pdl = b"
 
    primitive sequencer3(out<msg> a, out<msg> b, out<msg> c) {
 
        u32 i = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            out to = a;
 
            if     (i==1) to = b;
 
            else if(i==2) to = c;
 
            if(fires(to)) {
 
                put(to, create(0));
 
                i = (i + 1)%3;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let which_of_three = move |c: &mut Connector| {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        // silent round
 
        assert_eq!(0, c.sync(None).unwrap());
 
        // non silent round
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
#[test]
 
fn sequencer3_comp() {
 
    let test_log_path = Path::new("./logs/sequencer3_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 
    primitive fifo1_init<T>(bool has_value, T m, in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(has_value && fires(b)) {
 
                put(b, m);
 
                has_value = false;
 
            } else if (!has_value && fires(a)) {
 
                m = get(a);
 
                has_value = true;
 
            }
 
        }
 
    }
 
    composite fifo1_full<T>(in<T> a, out<T> b) {
 
        new fifo1_init(true, create(0), a, b);
 
    }
 
    composite fifo1<T>(in<T> a, out<T> b) {
 
        new fifo1_init(false, create(0), a, b);
 
    }
 
    composite sequencer3(out<msg> a, out<msg> b, out<msg> c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 

	
 
        new fifo1_full(o, d);
 
        new replicator(e, f, a);
 
        new fifo1(g, h);
 
        new replicator(i, j, b);
 
        new fifo1(k, l);
 
        new replicator(m, n, c);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let which_of_three = move |c: &mut Connector| {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(SEC1).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        // silent round
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
        // non silent round
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
enum XRouterItem {
 
    Silent,
 
    GetA,
 
    GetB,
 
}
 
// Hardcoded pseudo-random sequence of round behaviors for the native component
 
const XROUTER_ITEMS: &[XRouterItem] = {
 
    use XRouterItem::{GetA as A, GetB as B, Silent as S};
 
    &[
 
        B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B,
 
        S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A,
 
        A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S,
 
    ]
 
};
 

	
 
#[test]
 
fn xrouter_prim() {
 
    let test_log_path = Path::new("./logs/xrouter_prim");
 
    let pdl = b"
 
    primitive xrouter(in<msg> a, out<msg> b, out<msg> c) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                if(fires(b)) put(b, get(a));
 
                else         put(c, get(a));
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("PRIM {:?}", now.elapsed());
 
}
 
#[test]
 
fn xrouter_comp() {
 
    let test_log_path = Path::new("./logs/xrouter_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive merger(in<msg> a, in<msg> b, out<msg> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && !fires(b) && fires(c)) {
 
                    put(c, get(a));
 
                } else if (!fires(a) && fires(b) && fires(c)) {
 
                    put(c, get(b));
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive lossy<T>(in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                auto m = get(a);
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
    primitive sync_drain<T>(in<T> a, in<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                msg drop_it = get(a);
 
                msg on_the_floor = get(b);
 
            }
 
        }
 
    }
 
    composite xrouter(in<msg> a, out<msg> b, out<msg> c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 
        channel p -> q;
 
        channel r -> s;
 
        channel t -> u;
 

	
 
        new replicator(a, d, f);
 
        new replicator(g, t, h);
 
        new lossy(e, l);
 
        new lossy(i, j);
 
        new replicator(m, b, p);
 
        new replicator(k, n, c);
 
        new merger(q, o, r);
 
        new sync_drain(u, s);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("COMP {:?}", now.elapsed());
 
}
 

	
 
#[test]
 
fn count_stream() {
 
    let test_log_path = Path::new("./logs/count_stream");
 
    let pdl = b"
 
    primitive count_stream(out<msg> o) {
 
        msg m = create(1);
 
        m[0] = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            put(o, m);
 
            m[0] += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"", b"count_stream", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..16 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
}
 

	
 
#[test]
 
fn for_msg_byte() {
 
    let test_log_path = Path::new("./logs/for_msg_byte");
 
    let pdl = b"
 
    primitive for_msg_byte(out<msg> o) {
 
        u8 i = 0;
 
        u32 idx = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[idx] = i;
 
            synchronous put(o, m);
 
            sync put(o, m);
 
            i += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"", b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn eq_causality() {
 
    let test_log_path = Path::new("./logs/eq_causality");
 
    let pdl = b"
 
    primitive eq(in<msg> a, in<msg> b, out<msg> c) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                // left first!
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..4 {
 
        // everything is fine with LEFT FIRST
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap();
 

	
 
        // no solution when left is NOT FIRST
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap_err();
 
    }
 
}
 

	
 
#[test]
 
fn eq_no_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    composite eq(in<msg> a, in<msg> b, out<msg> c) {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in<msg> a, in<msg> b, out<msg> c, out<msg> leftfirsto, in<msg> leftfirsti) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                if(fires(leftfirsti)) {
 
                    // left first! DO USE DUMMY
 
                    ma = get(a);
 
                    put(c, ma);
 
                    mb = get(b);
 

	
 
                    // using dummy!
 
                    put(leftfirsto, ma);
 
                    auto drop_it = get(leftfirsti);
 
                } else {
 
                    // right first! DON'T USE DUMMY
 
                    mb = get(b);
 
                    put(c, mb);
 
                    ma = get(a);
 
                }
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..32 {
 
        // ok when they send
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(SEC1).unwrap();
 
        // ok when they don't
 
        c.sync(SEC1).unwrap();
 
    }
 
}
src/runtime2/tests/api_component.rs
Show inline comments
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
            sync {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_getting_from_component() {
 
    const CODE: &'static str ="
 
    primitive loop_sender(out<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            synchronous {
 
            sync {
 
                put(numbers, cur);
 
                cur += 1;
 
            }
 
        }
 
    }";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_sender", ValueGroup::new_stack(vec![
 
        Value::Output(PortId::new(channel.putter_id.index)),
 
        Value::UInt32(1337),
 
        Value::UInt32(1337 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Get(channel.getter_id),
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 

	
 
        assert!(result.len() == 1 && result[0].values.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, 1337 + loop_idx);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_putting_to_component() {
 
    const CODE: &'static str = "
 
    primitive loop_receiver(in<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            synchronous {
 
            sync {
 
                auto number = get(numbers);
 
                assert(number == cur);
 
                cur += 1;
 
            }
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_receiver", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(42),
 
        Value::UInt32(42 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::UInt32(42 + loop_idx)])),
 
        ]).expect("start sync round");
 

	
 
        // Note: if we finish a round, then it must have succeeded :)
 
        api.wait().expect("finish sync round");
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/network_shapes.rs
Show inline comments
 
// Testing particular graph shapes
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
            sync {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
            sync {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
            sync {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            synchronous {
 
            sync {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create connector");
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
            sync {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
            sync {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
            sync {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create connector");
 
    });
 
}
 
\ No newline at end of file
testdata/parser/negative/1.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested twice in primitive
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
			synchronous {}
 
		sync {
 
			sync {}
 
		}
 
	}
 
}
testdata/parser/negative/1.txt
Show inline comments
 
Parse error at 1.pdl:7:4: Illegal nested synchronous statement
 
			synchronous {}
 
			sync {}
 
			^
testdata/parser/negative/10.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested in sync block
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (false || true) {
 
				synchronous {
 
				sync {
 
					skip;
 
				}
 
			}
 
		}
 
	} 
 
}
testdata/parser/negative/10.txt
Show inline comments
 
Parse error at 10.pdl:8:5: Illegal nested synchronous statement
 
				synchronous {
 
				sync {
 
				^
testdata/parser/negative/12.pdl
Show inline comments
 
#version 100
 

	
 
// illegal node declaration
 
primitive main(in a, out b) {
 
	while (true) {
 
		channel x -> y;
 
		synchronous {}
 
		sync {}
 
	}
 
}
testdata/parser/negative/19.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				return 5;
 
			} else {
 
				block(a);
 
			}
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/negative/24.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * x *= 5;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/negative/3.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested deeply in composite
 
composite main(in a, out b) {
 
	channel x -> y;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			skip;
 
		}
 
	}
 
}
testdata/parser/negative/3.txt
Show inline comments
 
Parse error at 3.pdl:7:3: Illegal nested synchronous statement
 
		synchronous {
 
		sync {
 
		^
testdata/parser/negative/31.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            break; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/32.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    loop: {
 
        synchronous {
 
        sync {
 
            goto loop; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/4.pdl
Show inline comments
 
#version 100
 

	
 
// built-in outside sync block
 
primitive main(in a, out b) {
 
	int x = 0;
 
	msg y = create(0); // legal
 
	while (x < 10) {
 
		y = get(a); // illegal
 
		synchronous {
 
		sync {
 
			y = get(a); // legal
 
		}
 
	}
 
}
testdata/parser/negative/6.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
// duplicate formal parameters
 
composite main(in a, out a) {
 
	new sync(a, a);
 
	new sync_component(a, a);
 
}
testdata/parser/negative/7.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
// shadowing formal parameter
 
composite main(in a, out b) {
 
	channel c -> a;
 
	new sync(a, b);
 
	new sync_component(a, b);
 
}
testdata/parser/negative/8.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main(in a, out b) {
 
	channel c -> d;
 
	syncdrain(a, b);
 
}
 

	
 
// shadowing import
 
primitive syncdrain(in a, in b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (!fires(a) || !fires(b)) {
 
				block(a);
 
				block(b);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/1.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in asend, out arecv, in bsend, out brecv) {
 
    channel xo -> xi;
 
    channel yo -> yi;
 
    new replicator(asend, xo, brecv);
 
    new replicator(bsend, yo, arecv);
 
    // x fires first, then y, then x, et cetera
 
    new sequencer(xi, yi);
 
}
 

	
 
primitive replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 

	
 
composite sequencer(in x, in y) {
 
    channel ao -> ai;
 
    channel bo -> bi;
 
    channel co -> ci;
 
    channel do -> di;
 
    channel eo -> ei;
 
    channel fo -> fi;
 
    new syncdrain(x, ai);
 
    new syncdrain(y, bi);
 
    new replicator(ei, ao, co);
 
    new replicator(fi, bo, do);
 
    new fifo(ci, fo, null);
 
    new fifo(di, eo, create(0));
 
}
 

	
 
primitive syncdrain(in a, in b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                get(a);
 
                get(b);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (c != null) {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
            } else {
 
                assert !fires(b);
 
                if (fires(a)) {
 
                    c = get(a);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
primitive sequencer2(in x, in y) {
 
	while (true) {
 
	    boolean b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(y);
 
				if (fires(x))
 
					b = true;
 
			}
 
		}
 
		b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(x);
 
				if (fires(y))
 
					b = true;
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/10.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(in a, out[] b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				int i = 0;
 
				while (i < b.length) {
 
					if (fires(b[i])) {
 
						int j = i + 1;
 
						while (j < b.length)
 
							assert !fires(b[j++]);
 
						break;
 
					}
 
					i++;
 
				}
 
				assert i < b.length;
 
			} else {
 
				int i = 0;
 
				while (i < b.length)
 
					assert !fires(b[i++]);
 
			}
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/11.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	msg x = null;
 
	while (x == null) {
 
		synchronous {
 
		sync {
 
			if (fires(a))
 
				x = get(a);
 
		}
 
	}
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(b))
 
				put(b, x);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/12.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * (x *= 5);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/13.pdl
Show inline comments
 
#version 100
 

	
 
/*
 
Adaptation of 7.pdl
 
*/
 

	
 
composite main() {}
 

	
 
composite example(in[] a, in[] b, out x) {
 
	new async(a);
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					break;
 
				}
 
				i++;
 
			}
 
			if (i >= a.length || i >= b.length)
 
				assert !fires(x);
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length)
 
				if (fires(a[i++])) break;
 
			while (i < a.length)
 
				assert !fires(a[i++]);
 
		}
 
	}
 
}
testdata/parser/positive/14.pdl
Show inline comments
 
#version 100
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
    channel bo -> bi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new binary_replicator(bi, ao, c);
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
 

	
 
primitive binary_replicator(in b, out a, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(b) && fires(a) && fires(c)) {
 
                msg x = get(b);
 
                put(a, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/15.pdl
Show inline comments
 
#version
 

	
 
import std.reo;
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
	channel bo -> bi;
 
	channel axo -> axi;
 
	channel zo -> zi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new replicator(bi, {axo, c});
 
	new consensus({axi, zi}, ao);
 
	new generator(zo);
 
}
 

	
 
primitive generator(out z) {
 
	while (true) {
 
		synchronous (msg x) {
 
			if (x == null) {
 
				put(z, x);
 
				assert !fires(x);
 
			} else {
 
				put(z, x);
 
				assert fires(x);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/16.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {
 
	channel xo -> xi;
 
	new a(xi);
 
	new c(xo);
 
}
 

	
 
primitive a(in x) {
 
	synchronous {
 
	sync {
 
		msg m = get(x);
 
		assert m.length == 5;
 
		assert m[0] == 'h';
 
		assert m[1] == 'e';
 
		assert m[2] == 'l';
 
		assert m[3] == 'l';
 
		assert m[4] == 'o';
 
	}
 
}
 

	
 
primitive b(out x) {
 
	synchronous (msg m) {
 
		put(x, m);
 
	}
 
}
 
// or
 
primitive c(out x) {
 
	synchronous {
 
	sync {
 
		msg m = create(5);
 
		m[0] = 'h';
 
		m[1] = 'e';
 
		m[2] = 'l';
 
		m[3] = 'l';
 
		m[4] = 'o';
 
		put(x, m);
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/17.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in x, out y) {
 
	new prophet(x, y);
 
}
 

	
 
primitive prophet(in b, out a) {
 
	msg c = null;
 
	while (true) {
 
		if (c != null) {
 
			synchronous {
 
			sync {
 
				assert !fires(a);
 
				if (fires(b)) {
 
					assert get(b) == c;
 
					c = null;
 
				}
 
			}
 
		} else {
 
			synchronous (msg x) {
 
				assert !fires(b);
 
				if (fires(a)) {
 
					put(a, x);
 
					c = x;
 
				}
 
			}
 
		}
 
	}
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        if (c != null) {
 
        	synchronous {
 
        	sync {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
            }
 
        } else {
 
        	synchronous {
 
        	sync {
 
                assert !fires(b);
 
                if (fires(a)) {
 
                    c = get(a);
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/positive/18.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main() {}
 

	
 
primitive main1(in a, out c) {
 
	int x = 0;
 
	int y = 0;
 
	msg z = null;
 
	msg w = null;
 
	x = 1;
 
	y = 1;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (x > 0 && fires(a)) {
 
				z = get(a);
 
				x--;
 
			}
 
			if (w != null && fires(c)) {
 
				put(c, w);
 
				w = null;
 
				y++;
 
			}
 
		}
 
		synchronous {
 
		sync {
 
			assert !fires(a) && !fires(c);
 
			if (z != null && y > 0) {
 
				w = z;
 
				z = null;
 
				y--;
 
				x++;
 
			}
 
		}
 
	}
 
}
 

	
 
composite main2(in a, out c) {
 
	channel xo -> xi;
 
	new fifo(a, xo, null);
 
	new fifo(xi, c, null);
 
}
testdata/parser/positive/19.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(int a) {
 
    synchronous {
 
    sync {
 
        loop: {
 
            goto loop; // allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/positive/2.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main(in asend, out arecv, in bsend, out brecv, in csend, out crecv) {
 
    channel xo -> xi;
 
    channel yo -> yi;
 
    channel zo -> zi;
 
    // Every synchronous round, at most one message is sent (to determine a global order)
 
    new mymerger(asend, bsend, xo);
 
    new mymerger(csend, xi, yo);
 
    // If a message is sent, it is broadcast to every recipient
 
    new replicator(yi, {arecv, zo});
 
    new replicator(zi, {brecv, crecv});
 
}
 

	
 
primitive mymerger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && !fires(b) && fires(c)) {
 
                put(c, get(a));
 
            } else if (!fires(a) && fires(b) && fires(c)) {
 
                put(c, get(b));
 
            } else {
 
            	assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/3.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main(in ai, out ao, in bi, out bo, in ci, out co, in di, out do) {
 
    // Three parts:
 
    channel xo -> xi;
 
    {
 
        channel afo -> aii;
 
        channel bfo -> bii;
 
        channel cfo -> cii;
 
        channel dfo -> dii;
 
        // Part 1. Collect all in msgs.
 
        new fifo(ai, afo, null);
 
        new fifo(bi, bfo, null);
 
        new fifo(ci, cfo, null);
 
        new fifo(di, dfo, null);
 
        // Part 2. Compute maximum.
 
        new computeMax(aii, bii, cii, dii, xo);
 
    }
 
    // Part 3. Send maximum to all out msgs, and repeat.
 
    {
 
        channel xxo -> xxi;
 
        channel xxxo -> xxxi;
 
        new replicator(xi, xxo, ao);
 
        new replicator(xxi, xxxo, bo);
 
        new replicator(xxxi, co, do);
 
    }
 
}
 

	
 
primitive computeMax(in a, in b, in c, in d, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
            if (fires(a) && fires(b) && fires(c) && fires(d) && fires(x)) {
 
            	msg aa = get(a);
 
            	msg bb = get(b);
 
            	msg cc = get(c);
 
            	msg dd = get(d);
 
            	uint16_t aaa = aa[0] & aa[1] << 8;
 
                uint16_t bbb = bb[0] & bb[1] << 8;
 
                uint16_t ccc = cc[0] & cc[1] << 8;
 
                uint16_t ddd = dd[0] & dd[1] << 8;
 
                // broadcast message with highest header
 
                uint16_t max = aaa;
 
                if (bbb > max) max = bbb;
 
                if (ccc > max) max = ccc;
 
                if (ddd > max) max = ddd;
 
                if (max == aaa) put(x, aa);
 
                else if (max == bbb) put(x, bb);
 
                else if (max == ccc) put(x, cc);
 
                else if (max == ddd) put(x, dd);
 
            } else {
 
	            assert !fires(a) && !fires(b) && !fires(c) && !fires(d) && !fires(x);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/5.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 
import std.buf;
 

	
 
primitive main(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                msg x = get(a);
 
                short y = readShort(x, 0);
 
                y++;
 
                writeShort(x, 0, y);
 
                put(b, x);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/6.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in a1, in a2, in a3, out b1, out b2) {
 
	new reonode({a1, a2, a3}, {b1, b2});
 
}
 

	
 
composite reonode(in[] a, out[] b) {
 
	channel co -> ci;
 
	new merger(a, co);
 
	new replicator(ci, b);
 
}
 

	
 
composite replicator(in a, out[] b) {
 
	if (b.length == 0) {
 
		new blocking(a);
 
	} else if (b.length == 1) {
 
		new sync(a, b[0]);
 
		new sync_component(a, b[0]);
 
	} else {
 
		channel xo -> xi;
 
		new binary_replicator(a, b[0], xo);
 
		new replicator(xi, b[1 : b.length - 1]);
 
	}
 
}
 
primitive binary_replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive blocking(in a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
composite merger(in[] a, out b) {
 
	if (a.length == 0) {
 
		new silent(b);
 
	} else {
 
		in prev = a[0];
 
		int i = 1;
 
		while (i < a.length) {
 
			channel yi -> yo;
 
			new binary_merger(prev, a[i], yo);
 
			prev = yi;
 
			i++;
 
		}
 
		new sync(prev, b);
 
		new sync_component(prev, b);
 
	}
 
}
 
primitive binary_merger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(c)) {
 
                assert !fires(b);
 
                put(c, get(a));
 
            } else if (fires(b) && fires(c)) {
 
                assert !fires(a);
 
                put(c, get(b));
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive silent(out a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	put(b, get(a));
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/7.pdl
Show inline comments
 
#version 100
 

	
 
/*
 
Suggested by Benjamin Lion.
 
Source: http://www.wisdom.weizmann.ac.il/~naor/PUZZLES/compare.html
 

	
 
Bob comes to Ron, a manager at his company, with a complaint about a
 
sensitive matter; he asks Ron to keep his identity confidential. A few
 
months later, Moshe (another manager) tells Ron that someone has
 
complained to him, also with a confidentiality request, about the same
 
matter.
 

	
 
Ron and Moshe would like to determine whether the same person has
 
complained to each of them, but, if there are two complainers, Ron and
 
Moshe want to give no information to each other about their identities.
 

	
 
The protocol typically used in a situation like this one is akin to the
 
game ``twenty questions,'' but goes by the name of ``delicate
 
conversational probing.'' Ron might ask Moshe if Moshe's complainer is
 
male, and if the answer is ``yes'' Moshe might then ask Ron if Ron's
 
complainer's surname begins with a letter preceding ``M'' in the
 
alphabet. This goes on until Ron and Moshe have ascertained whether they
 
have the same person in mind. When they do not, however (particularly
 
when the first ``no'' occurs late in the game) a great deal of
 
information may have been exchanged.
 

	
 
What can Ron and Moshe do in order not leak more information than necessary?
 

	
 
Here is one of our favorite solutions suggested by Miki Ajtai of the IBM
 
Almaden Research Center. His proposal is "physical" in the sense that
 
Ron and Moshe must be together. We need to assume that there is a fairly
 
small pool of candidates, say twenty. Ron and Moshe obtain twenty
 
identical containers (perhaps by purchasing disposable cups (paper or
 
plastic)), arrange them in a line, and write labels in front of each
 
cup, one for each candidate. Ron then puts a folded slip of paper saying
 
``Yes'' in the cup of the person who complained to him, and a slip
 
saying ``No'' in the other nineteen cups. Moshe does the same. Ron and
 
Moshe then remove the labels, and shuffle the cups at random. They then
 
look inside the cups to see whether one of them contains two slips
 
saying ``Yes'' and decide accordingly.
 
*/
 

	
 
composite main() {}
 

	
 
composite puzzle(in[] a, in[] b, out x) {
 
	new async(a);
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					goto end;
 
				}
 
				i++;
 
			}
 
			assert !fires(x);
 
			end: skip;
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			int j = 0;
 
			while (i < a.length) {
 
				if (fires(a[i])) break;
 
				i++;
 
			}
 
			while (j < a.length) {
 
				assert i == j || !fires(a[j]);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/8.pdl
Show inline comments
 
#version 100
 

	
 
/*
 
Suggested by Luc Edixhoven.
 
Source: https://en.wikipedia.org/wiki/Thue%E2%80%93Morse_sequence
 

	
 
In mathematics, the Thue–Morse sequence, or Prouhet–Thue–Morse sequence,
 
is the binary sequence (an infinite sequence of 0s and 1s) obtained by
 
starting with 0 and successively appending the Boolean complement of the
 
sequence obtained thus far.
 

	
 
To compute the nth element t_n, write the number n in binary. If the
 
number of ones in this binary expansion is odd then t_n = 1, if even
 
then t_n = 0. For this reason John H. Conway et al. call numbers n
 
satisfying t_n = 1 odious (for odd) numbers and numbers for which
 
t_n = 0 evil (for even) numbers. In other words, t_n = 0 if n is
 
an evil number and t_n = 1 if n is an odious number.
 

	
 
*/
 

	
 
import std.reo;
 

	
 
composite main(out x) {
 
	channel ao -> ai;
 
	channel bo -> bi;
 
	channel co -> ci;
 
	new evil_or_odious(ai, bo);
 
	new replicator(bi, {co, x});
 
	new recorder(ao, ci);
 
}
 

	
 
primitive evil_or_odious(in x, out y) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(x) && fires(y)) {
 
				msg a = get(x);
 
				msg result = create(1);
 
				boolean even = true;
 
				int i = 0;
 
				while (i < a.length) {
 
					if (a[i++] == '1')
 
						even = !even;
 
				}
 
				result[0] = even ? '1' : '0';
 
				put(y, result);
 
			} else {
 
				assert !fires(x);
 
				assert !fires(y);
 
			}
 
		}
 
	}
 
}
 
primitive recorder(out h, in a) {
 
	msg c = create(0);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(h) && fires(a)) {
 
				put(h, c);
 
				{
 
					msg x = get(a);
 
					msg n = create(c.length + 1);
 
					int i = 0;
 
					while (i < c.length) {
 
						n[i] = c[i];
 
						i++;
 
					}
 
					n[c.length] = x[0];
 
					c = n;
 
				}
 
			}
 
		}
 
	}
 
} 
testdata/parser/positive/tarry.pdl
Show inline comments
 
#version 100
 

	
 
/*
 
Example distributed algorithm: Tarry's algorithm.
 

	
 
A token passes around the network, starting at some initiator. The initiator
 
starts the algorithm, and when the algorithm ends the token is back again at
 
the initiator. The non-initiators are signaled when they receive the token
 
for the first time; when the token is handled traversal continues.
 

	
 
The network topology is defined by applications: they create an initiator or
 
non-initiator component, and establish bidirectional channels in between.
 
In this example, the whole network is created statically for simulation
 
purposes: there are 4 processes and some channels in between.
 

	
 
Ports: initiator start, initiator end, three pairs of signals.
 
*/
 

	
 
import std.reo;
 

	
 
composite main(in start, out end, out s1o, in s1i, out s2o, in s2i, out s3o, in s3i) {
 
	// Processes: p, q, r, s
 
	// Channels: pq, pr, qr, rs
 
	channel p_pq -> pq_q;
 
	channel q_pq -> pq_p;
 
	channel p_pr -> pr_r;
 
	channel r_pr -> pr_p;
 
	channel q_qr -> qr_r;
 
	channel r_qr -> qr_q;
 
	channel r_rs -> rs_s;
 
	channel s_rs -> rs_r;
 
	
 
	new initiator(start, end, {pq_p, pr_p}, {p_pq, p_pr});
 
	new noninitiator(s1o, s1i, {pq_q, qr_q}, {q_pq, q_qr});
 
	new noninitiator(s2o, s2i, {pr_r, rs_r}, {r_pr, r_rs});
 
	new noninitiator(s3o, s3i, {rs_s}, {s_rs});
 
}
 
primitive initiator(in start, out end, in[] peeri, out[] peero) {
 
	msg token = null;
 
	in[] neighbori = {};
 
	out[] neighboro = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		// Step 1. Initiator waits for token
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Reset neighbors
 
		neighbori = peeri;
 
		peeri = {};
 
		neighboro = peero;
 
		peero = {};
 
		// Step 2. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			int idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
			// Eliminate from neighbor set
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 3. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 4. Token is back and all neighbors visited
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
	}
 
}
 
primitive noninitiator(out start, in end, in[] peeri, out[] peero) {
 
	msg token = null;
 
	in[] neighbori = {};
 
	out[] neighboro = {};
 
	in[] parenti = {};
 
	out[] parento = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		int idx = 0;
 
		// Step 1. Await token for first time
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				int i = 0;
 
				while (i < peeri.length) {
 
					if (fires(peeri[i])) {
 
						token = get(peeri[i]);
 
						idx = i;
 
						break;
 
					} else i++;
 
				}
 
			}
 
		}
 
		// Reset neighbors
 
		neighbori = peeri[0:idx] @ peeri[idx:peeri.length];
 
		neighboro = peero[0:idx] @ peero[idx:peero.length];
 
		parenti = {peeri[idx]};
 
		parento = {peero[idx]};
 
		peeri = {};
 
		peero = {};
 
		// Step 2. Non-initiator signals
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Step 3. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
			// Eliminate from neighbor set
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 4. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 5. Token is back, pass to parent
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(parento[0])) {
 
					put(parento[0], token);
 
					token = null;
 
				}
 
			}
 
		}
 
		peeri = {parenti[0]} @ peeri;
 
		peero = {parento[0]} @ peero;
 
		parenti = {};
 
		parento = {};
 
	}
 
}
0 comments (0 inline, 0 general)