Changeset - 36cc1fe490f7
[Not reviewed]
Merge
0 58 5
MH - 4 years ago 2021-11-15 12:23:58
contact@maxhenger.nl
Merge branch 'feat-api-cmds-and-branching'

Implements the programmer-facing API to allow programmatic
specification of a synchronous round. The way in which these put/get
interactions are performed is in an initial shape. Perhaps this will
change in the future.

The second main set of changes is the addion of a 'fork' statement,
which allows explicit forking, and allowing multiple puts/gets over the
same transport link within a single sync round.
62 files changed with 1682 insertions and 622 deletions:
0 comments (0 inline, 0 general)
examples/bench_04/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_05/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, port_pairs, proto_components;
 
	port_pairs = atoi(argv[1]);
 
	proto_components = atoi(argv[2]);
 
	printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_5.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<port_pairs; i++) {
 
		connector_add_port_pair(c, NULL, NULL);
 
	}
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
examples/bench_09/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive presync_work() {   "
 
	"    int i = 0;               "
 
	"    while(true) {            "
 
	"        i = 0;               "
 
	"        while(i < 2)  i++;   "
 
	"        synchronous {}       "
 
	"        sync {}       "
 
	"    }                        "
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "presync_work";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_11/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j, forwards, num_options, correct_index;
 
	forwards = atoi(argv[1]);
 
	num_options = atoi(argv[2]);
 
	printf("forwards %d, num_options %d\n",
 
		forwards, num_options);
 
	unsigned char pdl[] = 
 
	"primitive recv_zero(in a) {  "
 
	"    while(true) synchronous {"
 
	"    while(true) sync {"
 
	"        msg m = get(a);      "
 
	"        assert(m[0] == 0);   "
 
	"    }                        "
 
	"}                            "
 
	; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_11.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		// create a forward to tail of chain
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		// thread a forward component onto native_tail
 
		char ident[] = "forward";
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
examples/bench_23/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i;
 

	
 
	// unsigned char pdl[] = "\
 
	// primitive xrouter(in a, out b, out c) {\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            if(fires(a)) {\
 
 //                if(fires(b)) put(b, get(a));\
 
 //                else         put(c, get(a));\
 
 //            }\
 
 //        }\
 
 //    }"
 
 //    ;
 
	unsigned char pdl[] = "\
 
	primitive lossy(in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                msg m = get(a);\
 
                if(fires(b)) put(b, m);\
 
            }\
 
        }\
 
    }\
 
    primitive sync_drain(in a, in b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                get(a);\
 
                get(b);\
 
            }\
 
        }\
 
    }\
 
    composite xrouter(in a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        channel p -> q;\
 
        channel r -> s;\
 
        channel t -> u;\
 
        new replicator(a, d, f);\
 
        new replicator(g, t, h);\
 
        new lossy(e, l);\
 
        new lossy(i, j);\
 
        new replicator(m, b, p);\
 
        new replicator(k, n, c);\
 
        new merger(q, o, r);\
 
        new sync_drain(u, s);\
examples/bench_24/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j;
 

	
 
	unsigned char pdl[] = "\
 
	primitive fifo1_init(msg m, in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(m != null && fires(b)) {\
 
                put(b, m);\
 
                m = null;\
 
            } else if (m == null && fires(a)) {\
 
                m = get(a);\
 
            }\
 
        }\
 
    }\
 
    composite fifo1_full(in a, out b) {\
 
        new fifo1_init(create(0), a, b);\
 
    }\
 
    composite fifo1(in a, out b) {\
 
        new fifo1_init(null, a, b);\
 
    }\
 
    composite sequencer3(out a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        new fifo1_full(o, d);\
 
        new replicator(e, f, a);\
 
        new fifo1(g, h);\
 
        new replicator(i, j, b);\
 
        new fifo1(k, l);\
 
        new replicator(m, n, c);\
 
    }"
 
    ;
 
	// unsigned char pdl[] = "\
 
	// primitive sequencer3(out a, out b, out c) {\
 
 //        int i = 0;\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            out to = a;\
 
 //            if     (i==1) to = b;\
 
 //            else if(i==2) to = c;\
 
 //            if(fires(to)) {\
 
 //                put(to, create(0));\
 
 //                i = (i + 1)%3;\
 
 //            }\
 
 //        }\
 
 //    }"
 
    ;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, 0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	PortId putters[3], getters[3];
 
	for(i=0; i<3; i++) {
 
		connector_add_port_pair(c, &putters[i], &getters[i]);
 
	}
 
	char ident[] = "sequencer3";
 
	connector_add_component(c, ident, sizeof(ident)-1, putters, 3);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 

	
examples/bench_27/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "\
 
    primitive xrouter(in a, out b, out c) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                if(fires(b)) put(b, get(a));\
 
                else         put(c, get(a));\
 
            }\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[8];
 
    if(optimized=='y') {
 
        connector_add_port_pair(c, &ports[0], &ports[1]);
 
        connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized
 
        connector_add_component(c, "sync", 4, ports+1, 2);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    } else {
 
        for(i=0; i<4; i++) {
 
            connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
        }
 
        connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
  sync {
 
    put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 

	
 
primitive alt_round_merger(in a, in b, out c){
 
  while(true) {
 
    synchronous{ put(c, get(a)); }
 
    synchronous{ put(c, get(b)); }
 
    sync { put(c, get(a)); }
 
    sync { put(c, get(b)); }
 
  }	
 
}
src/protocol/ast.rs
Show inline comments
 
@@ -117,48 +117,50 @@ define_aliased_ast_id!(ImportId, Id<Import>, index(Import, imports), alloc(alloc
 
define_aliased_ast_id!(VariableId, Id<Variable>, index(Variable, variables), alloc(alloc_variable));
 

	
 
define_aliased_ast_id!(DefinitionId, Id<Definition>, index(Definition, definitions));
 
define_new_ast_id!(StructDefinitionId, DefinitionId, index(StructDefinition, Definition::Struct, definitions), alloc(alloc_struct_definition));
 
define_new_ast_id!(EnumDefinitionId, DefinitionId, index(EnumDefinition, Definition::Enum, definitions), alloc(alloc_enum_definition));
 
define_new_ast_id!(UnionDefinitionId, DefinitionId, index(UnionDefinition, Definition::Union, definitions), alloc(alloc_union_definition));
 
define_new_ast_id!(ComponentDefinitionId, DefinitionId, index(ComponentDefinition, Definition::Component, definitions), alloc(alloc_component_definition));
 
define_new_ast_id!(FunctionDefinitionId, DefinitionId, index(FunctionDefinition, Definition::Function, definitions), alloc(alloc_function_definition));
 

	
 
define_aliased_ast_id!(StatementId, Id<Statement>, index(Statement, statements));
 
define_new_ast_id!(BlockStatementId, StatementId, index(BlockStatement, Statement::Block, statements), alloc(alloc_block_statement));
 
define_new_ast_id!(EndBlockStatementId, StatementId, index(EndBlockStatement, Statement::EndBlock, statements), alloc(alloc_end_block_statement));
 
define_new_ast_id!(LocalStatementId, StatementId, index(LocalStatement, Statement::Local, statements), alloc(alloc_local_statement));
 
define_new_ast_id!(MemoryStatementId, LocalStatementId);
 
define_new_ast_id!(ChannelStatementId, LocalStatementId);
 
define_new_ast_id!(LabeledStatementId, StatementId, index(LabeledStatement, Statement::Labeled, statements), alloc(alloc_labeled_statement));
 
define_new_ast_id!(IfStatementId, StatementId, index(IfStatement, Statement::If, statements), alloc(alloc_if_statement));
 
define_new_ast_id!(EndIfStatementId, StatementId, index(EndIfStatement, Statement::EndIf, statements), alloc(alloc_end_if_statement));
 
define_new_ast_id!(WhileStatementId, StatementId, index(WhileStatement, Statement::While, statements), alloc(alloc_while_statement));
 
define_new_ast_id!(EndWhileStatementId, StatementId, index(EndWhileStatement, Statement::EndWhile, statements), alloc(alloc_end_while_statement));
 
define_new_ast_id!(BreakStatementId, StatementId, index(BreakStatement, Statement::Break, statements), alloc(alloc_break_statement));
 
define_new_ast_id!(ContinueStatementId, StatementId, index(ContinueStatement, Statement::Continue, statements), alloc(alloc_continue_statement));
 
define_new_ast_id!(SynchronousStatementId, StatementId, index(SynchronousStatement, Statement::Synchronous, statements), alloc(alloc_synchronous_statement));
 
define_new_ast_id!(EndSynchronousStatementId, StatementId, index(EndSynchronousStatement, Statement::EndSynchronous, statements), alloc(alloc_end_synchronous_statement));
 
define_new_ast_id!(ForkStatementId, StatementId, index(ForkStatement, Statement::Fork, statements), alloc(alloc_fork_statement));
 
define_new_ast_id!(EndForkStatementId, StatementId, index(EndForkStatement, Statement::EndFork, statements), alloc(alloc_end_fork_statement));
 
define_new_ast_id!(ReturnStatementId, StatementId, index(ReturnStatement, Statement::Return, statements), alloc(alloc_return_statement));
 
define_new_ast_id!(GotoStatementId, StatementId, index(GotoStatement, Statement::Goto, statements), alloc(alloc_goto_statement));
 
define_new_ast_id!(NewStatementId, StatementId, index(NewStatement, Statement::New, statements), alloc(alloc_new_statement));
 
define_new_ast_id!(ExpressionStatementId, StatementId, index(ExpressionStatement, Statement::Expression, statements), alloc(alloc_expression_statement));
 

	
 
define_aliased_ast_id!(ExpressionId, Id<Expression>, index(Expression, expressions));
 
define_new_ast_id!(AssignmentExpressionId, ExpressionId, index(AssignmentExpression, Expression::Assignment, expressions), alloc(alloc_assignment_expression));
 
define_new_ast_id!(BindingExpressionId, ExpressionId, index(BindingExpression, Expression::Binding, expressions), alloc(alloc_binding_expression));
 
define_new_ast_id!(ConditionalExpressionId, ExpressionId, index(ConditionalExpression, Expression::Conditional, expressions), alloc(alloc_conditional_expression));
 
define_new_ast_id!(BinaryExpressionId, ExpressionId, index(BinaryExpression, Expression::Binary, expressions), alloc(alloc_binary_expression));
 
define_new_ast_id!(UnaryExpressionId, ExpressionId, index(UnaryExpression, Expression::Unary, expressions), alloc(alloc_unary_expression));
 
define_new_ast_id!(IndexingExpressionId, ExpressionId, index(IndexingExpression, Expression::Indexing, expressions), alloc(alloc_indexing_expression));
 
define_new_ast_id!(SlicingExpressionId, ExpressionId, index(SlicingExpression, Expression::Slicing, expressions), alloc(alloc_slicing_expression));
 
define_new_ast_id!(SelectExpressionId, ExpressionId, index(SelectExpression, Expression::Select, expressions), alloc(alloc_select_expression));
 
define_new_ast_id!(LiteralExpressionId, ExpressionId, index(LiteralExpression, Expression::Literal, expressions), alloc(alloc_literal_expression));
 
define_new_ast_id!(CastExpressionId, ExpressionId, index(CastExpression, Expression::Cast, expressions), alloc(alloc_cast_expression));
 
define_new_ast_id!(CallExpressionId, ExpressionId, index(CallExpression, Expression::Call, expressions), alloc(alloc_call_expression));
 
define_new_ast_id!(VariableExpressionId, ExpressionId, index(VariableExpression, Expression::Variable, expressions), alloc(alloc_variable_expression));
 

	
 
#[derive(Debug)]
 
pub struct Heap {
 
    // Root arena, contains the entry point for different modules. Each root
 
    // contains lists of IDs that correspond to the other arenas.
 
    pub(crate) protocol_descriptions: Arena<Root>,
 
@@ -1014,115 +1016,120 @@ impl FunctionDefinition {
 
            builtin: false,
 
            span, identifier, poly_vars,
 
            return_types: Vec::new(),
 
            parameters: Vec::new(),
 
            body: BlockStatementId::new_invalid(),
 
            num_expressions_in_body: -1,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum Statement {
 
    Block(BlockStatement),
 
    EndBlock(EndBlockStatement),
 
    Local(LocalStatement),
 
    Labeled(LabeledStatement),
 
    If(IfStatement),
 
    EndIf(EndIfStatement),
 
    While(WhileStatement),
 
    EndWhile(EndWhileStatement),
 
    Break(BreakStatement),
 
    Continue(ContinueStatement),
 
    Synchronous(SynchronousStatement),
 
    EndSynchronous(EndSynchronousStatement),
 
    Fork(ForkStatement),
 
    EndFork(EndForkStatement),
 
    Return(ReturnStatement),
 
    Goto(GotoStatement),
 
    New(NewStatement),
 
    Expression(ExpressionStatement),
 
}
 

	
 
impl Statement {
 
    pub fn as_block(&self) -> &BlockStatement {
 
        match self {
 
            Statement::Block(result) => result,
 
            _ => panic!("Unable to cast `Statement` to `BlockStatement`"),
 
        }
 
    }
 
    pub fn as_local(&self) -> &LocalStatement {
 
        match self {
 
            Statement::Local(result) => result,
 
            _ => panic!("Unable to cast `Statement` to `LocalStatement`"),
 
        }
 
    }
 
    pub fn as_memory(&self) -> &MemoryStatement {
 
        self.as_local().as_memory()
 
    }
 
    pub fn as_channel(&self) -> &ChannelStatement {
 
        self.as_local().as_channel()
 
    }
 

	
 
    pub fn as_new(&self) -> &NewStatement {
 
        match self {
 
            Statement::New(result) => result,
 
            _ => panic!("Unable to cast `Statement` to `NewStatement`"),
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
        match self {
 
            Statement::Block(v) => v.span,
 
            Statement::Local(v) => v.span(),
 
            Statement::Labeled(v) => v.label.span,
 
            Statement::If(v) => v.span,
 
            Statement::While(v) => v.span,
 
            Statement::Break(v) => v.span,
 
            Statement::Continue(v) => v.span,
 
            Statement::Synchronous(v) => v.span,
 
            Statement::Fork(v) => v.span,
 
            Statement::Return(v) => v.span,
 
            Statement::Goto(v) => v.span,
 
            Statement::New(v) => v.span,
 
            Statement::Expression(v) => v.span,
 
            Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) => unreachable!(),
 
            Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) | Statement::EndFork(_) => unreachable!(),
 
        }
 
    }
 
    pub fn link_next(&mut self, next: StatementId) {
 
        match self {
 
            Statement::Block(stmt) => stmt.next = next,
 
            Statement::EndBlock(stmt) => stmt.next = next,
 
            Statement::Local(stmt) => match stmt {
 
                LocalStatement::Channel(stmt) => stmt.next = next,
 
                LocalStatement::Memory(stmt) => stmt.next = next,
 
            },
 
            Statement::EndIf(stmt) => stmt.next = next,
 
            Statement::EndWhile(stmt) => stmt.next = next,
 
            Statement::EndSynchronous(stmt) => stmt.next = next,
 
            Statement::EndFork(stmt) => stmt.next = next,
 
            Statement::New(stmt) => stmt.next = next,
 
            Statement::Expression(stmt) => stmt.next = next,
 
            Statement::Return(_)
 
            | Statement::Break(_)
 
            | Statement::Continue(_)
 
            | Statement::Synchronous(_)
 
            | Statement::Fork(_)
 
            | Statement::Goto(_)
 
            | Statement::While(_)
 
            | Statement::Labeled(_)
 
            | Statement::If(_) => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct BlockStatement {
 
    pub this: BlockStatementId,
 
    // Phase 1: parser
 
    pub is_implicit: bool,
 
    pub span: InputSpan, // of the complete block
 
    pub statements: Vec<StatementId>,
 
    pub end_block: EndBlockStatementId,
 
    // Phase 2: linker
 
    pub scope_node: ScopeNode,
 
    pub first_unique_id_in_scope: i32, // Temporary fix until proper bytecode/asm is generated
 
    pub next_unique_id_in_scope: i32, // Temporary fix until proper bytecode/asm is generated
 
    pub relative_pos_in_parent: u32,
 
    pub locals: Vec<VariableId>,
 
    pub labels: Vec<LabeledStatementId>,
 
    pub next: StatementId,
 
@@ -1250,60 +1257,76 @@ pub struct BreakStatement {
 
    pub this: BreakStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "break" keyword
 
    pub label: Option<Identifier>,
 
    // Phase 2: linker
 
    pub target: Option<EndWhileStatementId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ContinueStatement {
 
    pub this: ContinueStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "continue" keyword
 
    pub label: Option<Identifier>,
 
    // Phase 2: linker
 
    pub target: Option<WhileStatementId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SynchronousStatement {
 
    pub this: SynchronousStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "sync" keyword
 
    pub body: BlockStatementId,
 
    // Phase 2: linker
 
    pub end_sync: EndSynchronousStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndSynchronousStatement {
 
    pub this: EndSynchronousStatementId,
 
    pub start_sync: SynchronousStatementId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ForkStatement {
 
    pub this: ForkStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "fork" keyword
 
    pub left_body: BlockStatementId,
 
    pub right_body: Option<BlockStatementId>,
 
    pub end_fork: EndForkStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndForkStatement {
 
    pub this: EndForkStatementId,
 
    pub start_fork: ForkStatementId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ReturnStatement {
 
    pub this: ReturnStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "return" keyword
 
    pub expressions: Vec<ExpressionId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct GotoStatement {
 
    pub this: GotoStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "goto" keyword
 
    pub label: Identifier,
 
    // Phase 2: linker
 
    pub target: Option<LabeledStatementId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct NewStatement {
 
    pub this: NewStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "new" keyword
 
    pub expression: CallExpressionId,
src/protocol/ast_printer.rs
Show inline comments
 
@@ -15,48 +15,50 @@ const PREFIX_IMPORT_ID: &'static str = "Imp ";
 
const PREFIX_TYPE_ANNOT_ID: &'static str = "TyAn";
 
const PREFIX_VARIABLE_ID: &'static str = "Var ";
 
const PREFIX_DEFINITION_ID: &'static str = "Def ";
 
const PREFIX_STRUCT_ID: &'static str = "DefS";
 
const PREFIX_ENUM_ID: &'static str = "DefE";
 
const PREFIX_UNION_ID: &'static str = "DefU";
 
const PREFIX_COMPONENT_ID: &'static str = "DefC";
 
const PREFIX_FUNCTION_ID: &'static str = "DefF";
 
const PREFIX_STMT_ID: &'static str = "Stmt";
 
const PREFIX_BLOCK_STMT_ID: &'static str = "SBl ";
 
const PREFIX_ENDBLOCK_STMT_ID: &'static str = "SEBl";
 
const PREFIX_LOCAL_STMT_ID: &'static str = "SLoc";
 
const PREFIX_MEM_STMT_ID: &'static str = "SMem";
 
const PREFIX_CHANNEL_STMT_ID: &'static str = "SCha";
 
const PREFIX_SKIP_STMT_ID: &'static str = "SSki";
 
const PREFIX_LABELED_STMT_ID: &'static str = "SLab";
 
const PREFIX_IF_STMT_ID: &'static str = "SIf ";
 
const PREFIX_ENDIF_STMT_ID: &'static str = "SEIf";
 
const PREFIX_WHILE_STMT_ID: &'static str = "SWhi";
 
const PREFIX_ENDWHILE_STMT_ID: &'static str = "SEWh";
 
const PREFIX_BREAK_STMT_ID: &'static str = "SBre";
 
const PREFIX_CONTINUE_STMT_ID: &'static str = "SCon";
 
const PREFIX_SYNC_STMT_ID: &'static str = "SSyn";
 
const PREFIX_ENDSYNC_STMT_ID: &'static str = "SESy";
 
const PREFIX_FORK_STMT_ID: &'static str = "SFrk";
 
const PREFIX_END_FORK_STMT_ID: &'static str = "SEFk";
 
const PREFIX_RETURN_STMT_ID: &'static str = "SRet";
 
const PREFIX_ASSERT_STMT_ID: &'static str = "SAsr";
 
const PREFIX_GOTO_STMT_ID: &'static str = "SGot";
 
const PREFIX_NEW_STMT_ID: &'static str = "SNew";
 
const PREFIX_PUT_STMT_ID: &'static str = "SPut";
 
const PREFIX_EXPR_STMT_ID: &'static str = "SExp";
 
const PREFIX_ASSIGNMENT_EXPR_ID: &'static str = "EAsi";
 
const PREFIX_BINDING_EXPR_ID: &'static str = "EBnd";
 
const PREFIX_CONDITIONAL_EXPR_ID: &'static str = "ECnd";
 
const PREFIX_BINARY_EXPR_ID: &'static str = "EBin";
 
const PREFIX_UNARY_EXPR_ID: &'static str = "EUna";
 
const PREFIX_INDEXING_EXPR_ID: &'static str = "EIdx";
 
const PREFIX_SLICING_EXPR_ID: &'static str = "ESli";
 
const PREFIX_SELECT_EXPR_ID: &'static str = "ESel";
 
const PREFIX_LITERAL_EXPR_ID: &'static str = "ELit";
 
const PREFIX_CAST_EXPR_ID: &'static str = "ECas";
 
const PREFIX_CALL_EXPR_ID: &'static str = "ECll";
 
const PREFIX_VARIABLE_EXPR_ID: &'static str = "EVar";
 

	
 
struct KV<'a> {
 
    buffer: &'a mut String,
 
    prefix: Option<(&'static str, i32)>,
 
    indent: usize,
 
    temp_key: &'a mut String,
 
@@ -490,48 +492,66 @@ impl ASTWriter {
 
                self.kv(indent2).with_s_key("Target")
 
                    .with_opt_disp_val(stmt.target.as_ref().map(|v| &v.0.index));
 
            },
 
            Statement::Continue(stmt) => {
 
                self.kv(indent).with_id(PREFIX_CONTINUE_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Continue");
 
                self.kv(indent2).with_s_key("Label")
 
                    .with_opt_identifier_val(stmt.label.as_ref());
 
                self.kv(indent2).with_s_key("Target")
 
                    .with_opt_disp_val(stmt.target.as_ref().map(|v| &v.0.index));
 
            },
 
            Statement::Synchronous(stmt) => {
 
                self.kv(indent).with_id(PREFIX_SYNC_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Synchronous");
 
                self.kv(indent2).with_s_key("EndSync").with_disp_val(&stmt.end_sync.0.index);
 
                self.kv(indent2).with_s_key("Body");
 
                self.write_stmt(heap, stmt.body.upcast(), indent3);
 
            },
 
            Statement::EndSynchronous(stmt) => {
 
                self.kv(indent).with_id(PREFIX_ENDSYNC_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndSynchronous");
 
                self.kv(indent2).with_s_key("StartSync").with_disp_val(&stmt.start_sync.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            },
 
            Statement::Fork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Fork");
 
                self.kv(indent2).with_s_key("EndFork").with_disp_val(&stmt.end_fork.0.index);
 
                self.kv(indent2).with_s_key("LeftBody");
 
                self.write_stmt(heap, stmt.left_body.upcast(), indent3);
 

	
 
                if let Some(right_body_id) = stmt.right_body {
 
                    self.kv(indent2).with_s_key("RightBody");
 
                    self.write_stmt(heap, right_body_id.upcast(), indent3);
 
                }
 
            },
 
            Statement::EndFork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_END_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndFork");
 
                self.kv(indent2).with_s_key("StartFork").with_disp_val(&stmt.start_fork.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            }
 
            Statement::Return(stmt) => {
 
                self.kv(indent).with_id(PREFIX_RETURN_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Return");
 
                self.kv(indent2).with_s_key("Expressions");
 
                for expr_id in &stmt.expressions {
 
                    self.write_expr(heap, *expr_id, indent3);
 
                }
 
            },
 
            Statement::Goto(stmt) => {
 
                self.kv(indent).with_id(PREFIX_GOTO_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Goto");
 
                self.kv(indent2).with_s_key("Label").with_identifier_val(&stmt.label);
 
                self.kv(indent2).with_s_key("Target")
 
                    .with_opt_disp_val(stmt.target.as_ref().map(|v| &v.0.index));
 
            },
 
            Statement::New(stmt) => {
 
                self.kv(indent).with_id(PREFIX_NEW_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("New");
 
                self.kv(indent2).with_s_key("Expression");
 
                self.write_expr(heap, stmt.expression.upcast(), indent3);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            },
 
            Statement::Expression(stmt) => {
 
                self.kv(indent).with_id(PREFIX_EXPR_STMT_ID, stmt.this.0.index)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -179,48 +179,49 @@ impl Frame {
 
            }
 
            Expression::Call(expr) => {
 
                for arg_expr_id in &expr.arguments {
 
                    self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                    self.serialize_expression(heap, *arg_expr_id);
 
                }
 
            },
 
            Expression::Variable(_expr) => {
 
                // No subexpressions
 
            }
 
        }
 
    }
 
}
 

	
 
type EvalResult = Result<EvalContinuation, EvalError>;
 

	
 
pub enum EvalContinuation {
 
    Stepping,
 
    Inconsistent,
 
    Terminal,
 
    SyncBlockStart,
 
    SyncBlockEnd,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    NewChannel,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, Value),
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
    pub(crate) frames: Vec<Frame>,
 
    pub(crate) store: Store,
 
}
 

	
 
impl Prompt {
 
    pub fn new(_types: &TypeTable, heap: &Heap, def: DefinitionId, monomorph_idx: i32, args: ValueGroup) -> Self {
 
        let mut prompt = Self{
 
            frames: Vec::new(),
 
            store: Store::new(),
 
        };
 

	
 
        // Maybe do typechecking in the future?
 
        debug_assert!((monomorph_idx as usize) < _types.get_base_definition(&def).unwrap().definition.procedure_monomorphs().len());
 
        let new_frame = Frame::new(heap, def, monomorph_idx);
 
        let max_stack_size = new_frame.max_stack_size;
 
@@ -563,82 +564,83 @@ impl Prompt {
 
                            let subject = cur_frame.expr_values.pop_back().unwrap();
 
                            match apply_casting(&mut self.store, output_type, &subject) {
 
                                Ok(value) => cur_frame.expr_values.push_back(value),
 
                                Err(msg) => {
 
                                    return Err(EvalError::new_error_at_expr(self, modules, heap, expr.this.upcast(), msg));
 
                                }
 
                            }
 

	
 
                            self.store.drop_value(subject.get_heap_pos());
 
                        }
 
                        Expression::Call(expr) => {
 
                            // If we're dealing with a builtin we don't do any
 
                            // fancy shenanigans at all, just push the result.
 
                            match expr.method {
 
                                Method::Get => {
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value = self.store.maybe_read_ref(&value).clone();
 

	
 
                                    let port_id = if let Value::Input(port_id) = value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'get' on value {:?}", value)
 
                                    };
 

	
 
                                    match ctx.get(port_id) {
 
                                    match ctx.performed_get(port_id) {
 
                                        Some(result) => {
 
                                            // We have the result. Merge the `ValueGroup` with the
 
                                            // stack/heap storage.
 
                                            debug_assert_eq!(result.values.len(), 1);
 
                                            result.into_stack(&mut cur_frame.expr_values, &mut self.store);
 
                                        },
 
                                        None => {
 
                                            // Don't have the result yet, prepare the expression to
 
                                            // get run again after we've received a message.
 
                                            cur_frame.expr_values.push_front(value.clone());
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockGet(port_id));
 
                                        }
 
                                    }
 
                                },
 
                                Method::Put => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_port_value = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = if let Value::Output(port_id) = deref_port_value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'put' on value {:?}", deref_port_value)
 
                                    };
 

	
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    if ctx.did_put(port_id) {
 
                                    if ctx.performed_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        // Prepare to execute again
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        return Ok(EvalContinuation::Put(port_id, deref_msg_value));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let port_value_deref = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = match port_value_deref {
 
                                        Value::Input(port_id) => port_id,
 
                                        Value::Output(port_id) => port_id,
 
                                        _ => unreachable!("executor calling 'fires' on value {:?}", port_value_deref),
 
                                    };
 

	
 
                                    match ctx.fires(port_id) {
 
                                        None => {
 
                                            cur_frame.expr_values.push_front(port_value);
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockFires(port_id));
 
                                        },
 
                                        Some(value) => {
 
                                            cur_frame.expr_values.push_back(value);
 
@@ -789,49 +791,49 @@ impl Prompt {
 
            Statement::Block(stmt) => {
 
                debug_assert!(stmt.statements.is_empty() || stmt.next == stmt.statements[0]);
 
                cur_frame.position = stmt.next;
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::EndBlock(stmt) => {
 
                let block = &heap[stmt.start_block];
 
                self.store.clear_stack(block.first_unique_id_in_scope as usize);
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::Local(stmt) => {
 
                match stmt {
 
                    LocalStatement::Memory(stmt) => {
 
                        let variable = &heap[stmt.variable];
 
                        self.store.write(ValueId::Stack(variable.unique_id_in_scope as u32), Value::Unassigned);
 

	
 
                        cur_frame.position = stmt.next;
 
                        Ok(EvalContinuation::Stepping)
 
                    },
 
                    LocalStatement::Channel(stmt) => {
 
                        // Need to create a new channel by requesting it from
 
                        // the runtime.
 
                        match ctx.get_channel() {
 
                        match ctx.created_channel() {
 
                            None => {
 
                                // No channel is pending. So request one
 
                                Ok(EvalContinuation::NewChannel)
 
                            },
 
                            Some((put_port, get_port)) => {
 
                                self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), put_port);
 
                                self.store.write(ValueId::Stack(heap[stmt.to].unique_id_in_scope as u32), get_port);
 
                                cur_frame.position = stmt.next;
 
                                Ok(EvalContinuation::Stepping)
 
                            }
 
                        }
 
                    }
 
                }
 
            },
 
            Statement::Labeled(stmt) => {
 
                cur_frame.position = stmt.body;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::If(stmt) => {
 
                debug_assert_eq!(cur_frame.expr_values.len(), 1, "expected one expr value for if statement");
 
                let test_value = cur_frame.expr_values.pop_back().unwrap();
 
                let test_value = self.store.maybe_read_ref(&test_value).as_bool();
 
                if test_value {
 
@@ -865,48 +867,75 @@ impl Prompt {
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::Break(stmt) => {
 
                cur_frame.position = stmt.target.unwrap().upcast();
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::Continue(stmt) => {
 
                cur_frame.position = stmt.target.unwrap().upcast();
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::Synchronous(stmt) => {
 
                cur_frame.position = stmt.body.upcast();
 

	
 
                Ok(EvalContinuation::SyncBlockStart)
 
            },
 
            Statement::EndSynchronous(stmt) => {
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::SyncBlockEnd)
 
            },
 
            Statement::Fork(stmt) => {
 
                if stmt.right_body.is_none() {
 
                    // No reason to fork
 
                    cur_frame.position = stmt.left_body.upcast();
 
                } else {
 
                    // Need to fork
 
                    if let Some(go_left) = ctx.performed_fork() {
 
                        // Runtime has created a fork
 
                        if go_left {
 
                            cur_frame.position = stmt.left_body.upcast();
 
                        } else {
 
                            cur_frame.position = stmt.right_body.unwrap().upcast();
 
                        }
 
                    } else {
 
                        // Request the runtime to create a fork of the current
 
                        // branch
 
                        return Ok(EvalContinuation::NewFork);
 
                    }
 
                }
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::EndFork(stmt) => {
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            }
 
            Statement::Return(_stmt) => {
 
                debug_assert!(heap[cur_frame.definition].is_function());
 
                debug_assert_eq!(cur_frame.expr_values.len(), 1, "expected one expr value for return statement");
 

	
 
                // The preceding frame has executed a call, so is expecting the
 
                // return expression on its expression value stack. Note that
 
                // we may be returning a reference to something on our stack,
 
                // so we need to read that value and clone it.
 
                let return_value = cur_frame.expr_values.pop_back().unwrap();
 
                let return_value = match return_value {
 
                    Value::Ref(value_id) => self.store.read_copy(value_id),
 
                    _ => return_value,
 
                };
 

	
 
                // Pre-emptively pop our stack frame
 
                self.frames.pop();
 

	
 
                // Clean up our section of the stack
 
                self.store.clear_stack(0);
 
                self.store.stack.truncate(self.store.cur_stack_boundary + 1);
 
                let prev_stack_idx = self.store.stack.pop().unwrap().as_stack_boundary();
 

	
 
                // TODO: Temporary hack for testing, remove at some point
 
                if self.frames.is_empty() {
src/protocol/mod.rs
Show inline comments
 
@@ -31,48 +31,49 @@ pub struct ProtocolDescription {
 
    pub(crate) types: TypeTable,
 
    pub(crate) pool: Mutex<StringPool>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentState {
 
    pub(crate) prompt: Prompt,
 
}
 

	
 
#[allow(dead_code)]
 
pub(crate) enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
}
 
//////////////////////////////////////////////
 

	
 
#[derive(Debug)]
 
pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
    UnownedPort,
 
    InSync,
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(An opaque protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    // TODO: Allow for multi-file compilation
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        // TODO: @fixme, keep code compilable, but needs support for multiple
 
        //  input files.
 
        let source = InputSource::new(String::new(), Vec::from(buffer));
 
        let mut parser = Parser::new();
 
        parser.feed(source).expect("failed to feed source");
 
        
 
        if let Err(err) = parser.parse() {
 
            println!("ERROR:\n{}", err);
 
            return Err(format!("{}", err))
 
        }
 

	
 
        debug_assert_eq!(parser.modules.len(), 1, "only supporting one module here for now");
 
        let modules: Vec<Module> = parser.modules.into_iter()
 
            .map(|module| Module{
 
@@ -262,95 +263,99 @@ impl ProtocolDescription {
 
                if let Value::Array(heap_pos) = argument {
 
                    let heap_pos = *heap_pos;
 
                    for element in &arguments.regions[heap_pos as usize] {
 
                        if !self.verify_same_type(expected, expected_idx + 1, arguments, element) {
 
                            return false;
 
                        }
 
                    }
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => if let Value::Input(_) = argument { true } else { false },
 
            CTP::Output => if let Value::Output(_) = argument { true } else { false },
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn performed_put(&mut self, port: PortId) -> bool;
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
    fn performed_fork(&mut self) -> Option<bool>; // None if not yet forked
 
    fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
pub enum RunResult {
 
    // Can only occur outside sync blocks
 
    ComponentTerminated, // component has exited its procedure
 
    ComponentAtSyncStart,
 
    NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync
 
    NewChannel, // should also be possible inside sync
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchGet(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchFork,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
        use RunResult as RR;
 

	
 
        loop {
 
            let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            match step_result {
 
                Err(reason) => {
 
                    // TODO: @temp
 
                    println!("Evaluation error:\n{}", reason);
 
                    todo!("proper error handling/bubbling up");
 
                },
 
                Ok(continuation) => match continuation {
 
                    // TODO: Probably want to remove this translation
 
                    EC::Stepping => continue,
 
                    EC::Inconsistent => return RR::BranchInconsistent,
 
                    EC::Terminal => return RR::ComponentTerminated,
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::NewFork =>
 
                        return RR::BranchFork,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchGet(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                        return RR::BranchPut(port_id, value_group);
 
                    },
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
// TODO: @remove the old stuff
 
impl ComponentState {
 
    pub(crate) fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
        let mut context = EvalContext::Nonsync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
@@ -376,174 +381,181 @@ impl ComponentState {
 
                                _ => {}
 
                            }
 
                        }
 
                        for region in args.regions.iter() {
 
                            for arg in region {
 
                                match arg {
 
                                    Value::Output(port) => { moved_ports.insert(*port); },
 
                                    Value::Input(port) => { moved_ports.insert(*port); },
 
                                    _ => {},
 
                                }
 
                            }
 
                        }
 
                        let init_state = ComponentState { prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, args) };
 
                        context.new_component(moved_ports, init_state);
 
                        // Continue stepping
 
                        continue;
 
                    },
 
                    EvalContinuation::NewChannel => {
 
                        // Because of the way we emulate the old context for now, we can safely
 
                        // assume that this will never happen. The old context thingamajig always
 
                        // creates a channel, it never bubbles a "need to create a channel" message
 
                        // to the runtime
 
                        unreachable!();
 
                    },
 
                    EvalContinuation::NewFork => unreachable!(),
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
                    EvalContinuation::Put(_, _) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn sync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut SyncProtoContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> SyncBlocker {
 
        let mut context = EvalContext::Sync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context);
 
            match result {
 
                Err(err) => {
 
                    println!("Evaluation error:\n{}", err);
 
                    panic!("proper error handling when component fails");
 
                },
 
                Ok(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _, _) => unreachable!(),
 
                    EvalContinuation::NewChannel => unreachable!(),
 
                    EvalContinuation::NewFork => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        return SyncBlocker::CouldntCheckFiring(port);
 
                    },
 
                    EvalContinuation::BlockGet(port) => {
 
                        return SyncBlocker::CouldntReadMsg(port);
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let payload;
 
                        match message {
 
                            Value::Null => {
 
                                return SyncBlocker::Inconsistent;
 
                            },
 
                            Value::Message(heap_pos) => {
 
                                // Create a copy of the payload
 
                                let values = &self.prompt.store.heap_regions[heap_pos as usize].values;
 
                                let mut bytes = Vec::with_capacity(values.len());
 
                                for value in values {
 
                                    bytes.push(value.as_uint8());
 
                                }
 
                                payload = Payload(Arc::new(bytes));
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return SyncBlocker::PutMsg(port, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
    fn performed_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                ctx.did_put_or_get(port)
 
            }
 
        }
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                let payload = ctx.read_msg(port);
 
                if payload.is_none() {
 
                    return None;
 
                }
 

	
 
                let payload = payload.unwrap();
 
                let mut transformed = Vec::with_capacity(payload.len());
 
                for byte in payload.0.iter() {
 
                    transformed.push(Value::UInt8(*byte));
 
                }
 

	
 
                let value_group = ValueGroup{
 
                    values: vec![Value::Message(0)],
 
                    regions: vec![transformed],
 
                };
 

	
 
                return Some(value_group);
 
            }
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => {
 
                match context.is_firing(port) {
 
                    Some(did_fire) => Some(Value::Bool(did_fire)),
 
                    None => None,
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(from);
 
                let to = Value::Input(to);
 
                return Some((from, to));
 
            },
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        // Never actually used in the old runtime
 
        return None;
 
    }
 
}
 

	
 
// TODO: @remove once old runtime has disappeared
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         // EvalContext::None => unreachable!(),
 
    //         EvalContext::Nonsync(_context) => todo!(),
 
    //         EvalContext::Sync(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                context.new_component(moved_ports, init_state)
 
            }
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
src/protocol/parser/pass_definitions.rs
Show inline comments
 
@@ -402,55 +402,70 @@ impl PassDefinitions {
 

	
 
                let if_stmt = &mut ctx.heap[id];
 
                if_stmt.end_if = end_if;
 
            } else if ident == KW_STMT_WHILE {
 
                let id = self.consume_while_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 

	
 
                let end_while = ctx.heap.alloc_end_while_statement(|this| EndWhileStatement{
 
                    this, start_while: id, next: StatementId::new_invalid()
 
                });
 
                section.push(end_while.upcast());
 

	
 
                let while_stmt = &mut ctx.heap[id];
 
                while_stmt.end_while = end_while;
 
            } else if ident == KW_STMT_BREAK {
 
                let id = self.consume_break_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_CONTINUE {
 
                let id = self.consume_continue_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_SYNC {
 
                let id = self.consume_synchronous_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 

	
 
                let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement{
 
                    this, start_sync: id, next: StatementId::new_invalid()
 
                let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement {
 
                    this,
 
                    start_sync: id,
 
                    next: StatementId::new_invalid()
 
                });
 
                section.push(end_sync.upcast());
 

	
 
                let sync_stmt = &mut ctx.heap[id];
 
                sync_stmt.end_sync = end_sync;
 
            } else if ident == KW_STMT_FORK {
 
                let id = self.consume_fork_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 

	
 
                let end_fork = ctx.heap.alloc_end_fork_statement(|this| EndForkStatement{
 
                    this,
 
                    start_fork: id,
 
                    next: StatementId::new_invalid(),
 
                });
 
                section.push(end_fork.upcast());
 

	
 
                let fork_stmt = &mut ctx.heap[id];
 
                fork_stmt.end_fork = end_fork;
 
            } else if ident == KW_STMT_RETURN {
 
                let id = self.consume_return_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_GOTO {
 
                let id = self.consume_goto_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_NEW {
 
                let id = self.consume_new_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_CHANNEL {
 
                let id = self.consume_channel_statement(module, iter, ctx)?;
 
                section.push(id.upcast().upcast());
 
            } else if iter.peek() == Some(TokenKind::Colon) {
 
                self.consume_labeled_statement(module, iter, ctx, section)?;
 
            } else {
 
                // Two fallback possibilities: the first one is a memory
 
                // declaration, the other one is to parse it as a regular
 
                // expression. This is a bit ugly
 
                if let Some((memory_stmt_id, assignment_stmt_id)) = self.maybe_consume_memory_statement(module, iter, ctx)? {
 
                    section.push(memory_stmt_id.upcast().upcast());
 
                    section.push(assignment_stmt_id.upcast());
 
                } else {
 
                    let id = self.consume_expression_statement(module, iter, ctx)?;
 
                    section.push(id.upcast());
 
@@ -592,48 +607,71 @@ impl PassDefinitions {
 
        };
 
        consume_token(&module.source, iter, TokenKind::SemiColon)?;
 
        Ok(ctx.heap.alloc_continue_statement(|this| ContinueStatement{
 
            this,
 
            span: continue_span,
 
            label,
 
            target: None
 
        }))
 
    }
 

	
 
    fn consume_synchronous_statement(
 
        &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx
 
    ) -> Result<SynchronousStatementId, ParseError> {
 
        let synchronous_span = consume_exact_ident(&module.source, iter, KW_STMT_SYNC)?;
 
        let body = self.consume_block_or_wrapped_statement(module, iter, ctx)?;
 

	
 
        Ok(ctx.heap.alloc_synchronous_statement(|this| SynchronousStatement{
 
            this,
 
            span: synchronous_span,
 
            body,
 
            end_sync: EndSynchronousStatementId::new_invalid(),
 
        }))
 
    }
 

	
 
    fn consume_fork_statement(
 
        &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx
 
    ) -> Result<ForkStatementId, ParseError> {
 
        let fork_span = consume_exact_ident(&module.source, iter, KW_STMT_FORK)?;
 
        let left_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?;
 

	
 
        let right_body = if has_ident(&module.source, iter, KW_STMT_OR) {
 
            iter.consume();
 
            let right_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?;
 
            Some(right_body)
 
        } else {
 
            None
 
        };
 

	
 
        Ok(ctx.heap.alloc_fork_statement(|this| ForkStatement{
 
            this,
 
            span: fork_span,
 
            left_body,
 
            right_body,
 
            end_fork: EndForkStatementId::new_invalid(),
 
        }))
 
    }
 

	
 
    fn consume_return_statement(
 
        &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx
 
    ) -> Result<ReturnStatementId, ParseError> {
 
        let return_span = consume_exact_ident(&module.source, iter, KW_STMT_RETURN)?;
 
        let mut scoped_section = self.expressions.start_section();
 

	
 
        consume_comma_separated_until(
 
            TokenKind::SemiColon, &module.source, iter, ctx,
 
            |_source, iter, ctx| self.consume_expression(module, iter, ctx),
 
            &mut scoped_section, "an expression", None
 
        )?;
 
        let expressions = scoped_section.into_vec();
 

	
 
        if expressions.is_empty() {
 
            return Err(ParseError::new_error_str_at_span(&module.source, return_span, "expected at least one return value"));
 
        } else if expressions.len() > 1 {
 
            return Err(ParseError::new_error_str_at_span(&module.source, return_span, "multiple return values are not (yet) supported"))
 
        }
 

	
 
        Ok(ctx.heap.alloc_return_statement(|this| ReturnStatement{
 
            this,
 
            span: return_span,
 
            expressions
 
        }))
src/protocol/parser/pass_typing.rs
Show inline comments
 
@@ -1121,48 +1121,61 @@ impl Visitor for PassTyping {
 
        }
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let while_stmt = &ctx.heap[id];
 

	
 
        let body_id = while_stmt.body;
 
        let test_expr_id = while_stmt.test;
 

	
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.visit_block_stmt(ctx, body_id)?;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_synchronous_stmt(&mut self, ctx: &mut Ctx, id: SynchronousStatementId) -> VisitorResult {
 
        let sync_stmt = &ctx.heap[id];
 
        let body_id = sync_stmt.body;
 

	
 
        self.visit_block_stmt(ctx, body_id)
 
    }
 

	
 
    fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult {
 
        let fork_stmt = &ctx.heap[id];
 
        let left_body_id = fork_stmt.left_body;
 
        let right_body_id = fork_stmt.right_body;
 

	
 
        self.visit_block_stmt(ctx, left_body_id)?;
 
        if let Some(right_body_id) = right_body_id {
 
            self.visit_block_stmt(ctx, right_body_id)?;
 
        }
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult {
 
        let return_stmt = &ctx.heap[id];
 
        debug_assert_eq!(return_stmt.expressions.len(), 1);
 
        let expr_id = return_stmt.expressions[0];
 

	
 
        self.visit_expr(ctx, expr_id)
 
    }
 

	
 
    fn visit_new_stmt(&mut self, ctx: &mut Ctx, id: NewStatementId) -> VisitorResult {
 
        let new_stmt = &ctx.heap[id];
 
        let call_expr_id = new_stmt.expression;
 

	
 
        self.visit_call_expr(ctx, call_expr_id)
 
    }
 

	
 
    fn visit_expr_stmt(&mut self, ctx: &mut Ctx, id: ExpressionStatementId) -> VisitorResult {
 
        let expr_stmt = &ctx.heap[id];
 
        let subexpr_id = expr_stmt.expression;
 

	
 
        self.visit_expr(ctx, subexpr_id)
 
    }
 

	
 
    // Expressions
 

	
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
/*
 
 * pass_validation_linking.rs
 
 *
 
 * The pass that will validate properties of the AST statements (one is not
 
 * allowed to nest synchronous statements, instantiating components occurs in
 
 * the right places, etc.) and expressions (assignments may not occur in
 
 * arbitrary expressions).
 
 *
 
 * Furthermore, this pass will also perform "linking", in the sense of: some AST
 
 * nodes have something to do with one another, so we link them up in this pass
 
 * (e.g. setting the parents of expressions, linking the control flow statements
 
 * like `continue` and `break` up to the respective loop statement, etc.).
 
 *
 
 * There are several "confusing" parts about this pass:
 
 *
 
 * Setting expression parents: this is the simplest one. The pass struct acts
 
 * like a little state machine. When visiting an expression it will set the
 
 * "parent expression" field of the pass to itself, then visit its child. The
 
 * child will look at this "parent expression" field to determine its parent.
 
 *
 
 * Setting the `next` statement: the AST is a tree, but during execution we walk
 
 * a linear path through all statements. So where appropriate a statement may
 
 * set the "previous statement" field of the pass to itself. When visiting the
 
 * subsequent statement it will check this "previous statement", and if set, it
 
 * will link this previous statement up to itself. Not every statement has a
 
 * previous statement. Hence there are two patterns that occur: assigning the
 
 * `next` value, then clearing the "previous statement" field. And assigning the
 
 * `next` value, and then putting the current statement's ID in the "previous
 
 * statement" field. Because it is so common, this file contain two macros that
 
 * perform that operation.
 
 *
 
 * To make storing types for polymorphic procedures simpler and more efficient,
 
 * we assign to each expression in the procedure a unique ID. This is what the
 
 * "next expression index" field achieves. Each expression simply takes the
 
 * current value, and then increments this counter.
 
 */
 

	
 
use crate::collections::{ScopedBuffer};
 
use crate::protocol::ast::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::symbol_table::*;
 
use crate::protocol::parser::type_table::*;
 

	
 
use super::visitor::{
 
    STMT_BUFFER_INIT_CAPACITY,
 
    EXPR_BUFFER_INIT_CAPACITY,
 
    Ctx,
 
    Visitor,
 
    VisitorResult
 
};
 
use crate::protocol::parser::ModuleCompilationPhase;
 

	
 
#[derive(PartialEq, Eq)]
 
enum DefinitionType {
 
    Primitive(ComponentDefinitionId),
 
    Composite(ComponentDefinitionId),
 
    Function(FunctionDefinitionId)
 
}
 

	
 
impl DefinitionType {
 
    fn is_primitive(&self) -> bool { if let Self::Primitive(_) = self { true } else { false } }
 
@@ -234,49 +271,50 @@ impl Visitor for PassValidationLinking {
 
        self.visit_stmt(ctx, body_id)?;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_if_stmt(&mut self, ctx: &mut Ctx, id: IfStatementId) -> VisitorResult {
 
        let if_stmt = &ctx.heap[id];
 
        let end_if_id = if_stmt.end_if;
 
        let test_expr_id = if_stmt.test;
 
        let true_stmt_id = if_stmt.true_body;
 
        let false_stmt_id = if_stmt.false_body;
 

	
 
        // Visit test expression
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert!(self.in_test_expr.is_invalid());
 

	
 
        self.in_test_expr = id.upcast();
 
        self.expr_parent = ExpressionParent::If(id);
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.in_test_expr = StatementId::new_invalid();
 

	
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        // Visit true and false branch. Executor chooses next statement based on
 
        // test expression, not on if-statement itself.
 
        // test expression, not on if-statement itself. Hence the if statement
 
        // does not have a static subsequent statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.visit_block_stmt(ctx, true_stmt_id)?;
 
        assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 

	
 
        if let Some(false_id) = false_stmt_id {
 
            self.visit_block_stmt(ctx, false_id)?;
 
            assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_if_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let end_while_id = stmt.end_while;
 
        let test_expr_id = stmt.test;
 
        let body_stmt_id = stmt.body;
 

	
 
        let old_while = self.in_while;
 
        self.in_while = id;
 

	
 
        // Visit test expression
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
@@ -349,48 +387,78 @@ impl Visitor for PassValidationLinking {
 
            ));
 
        }
 

	
 
        if !self.def_type.is_primitive() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, cur_sync_span,
 
                "synchronous statements may only be used in primitive components"
 
            ));
 
        }
 

	
 
        // Synchronous statement implicitly moves to its block
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        let sync_body = ctx.heap[id].body;
 
        debug_assert!(self.in_sync.is_invalid());
 
        self.in_sync = id;
 
        self.visit_block_stmt_with_hint(ctx, sync_body, Some(id))?;
 
        assign_and_replace_next_stmt!(self, ctx, end_sync_id.upcast());
 

	
 
        self.in_sync = SynchronousStatementId::new_invalid();
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult {
 
        let fork_stmt = &ctx.heap[id];
 
        let end_fork_id = fork_stmt.end_fork;
 
        let left_body_id = fork_stmt.left_body;
 
        let right_body_id = fork_stmt.right_body;
 

	
 
        // Fork statements may only occur inside sync blocks
 
        if self.in_sync.is_invalid() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, fork_stmt.span,
 
                "Forking may only occur inside sync blocks"
 
            ));
 
        }
 

	
 
        // Visit the respective bodies. Like the if statement, a fork statement
 
        // does not have a single static subsequent statement. It forks and then
 
        // each fork has a different next statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.visit_block_stmt(ctx, left_body_id)?;
 
        assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 

	
 
        if let Some(right_body_id) = right_body_id {
 
            self.visit_block_stmt(ctx, right_body_id)?;
 
            assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_fork_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult {
 
        // Check if "return" occurs within a function
 
        let stmt = &ctx.heap[id];
 
        if !self.def_type.is_function() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, stmt.span,
 
                "return statements may only appear in function bodies"
 
            ));
 
        }
 

	
 
        // If here then we are within a function
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert_eq!(ctx.heap[id].expressions.len(), 1);
 
        self.expr_parent = ExpressionParent::Return(id);
 
        self.visit_expr(ctx, ctx.heap[id].expressions[0])?;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_goto_stmt(&mut self, ctx: &mut Ctx, id: GotoStatementId) -> VisitorResult {
 
        let target_id = self.find_label(ctx, &ctx.heap[id].label)?;
 
        ctx.heap[id].target = Some(target_id);
src/protocol/parser/token_parsing.rs
Show inline comments
 
@@ -24,49 +24,51 @@ pub(crate) const KW_IMPORT:    &'static [u8] = b"import";
 
// Keywords - literals
 
pub(crate) const KW_LIT_TRUE:  &'static [u8] = b"true";
 
pub(crate) const KW_LIT_FALSE: &'static [u8] = b"false";
 
pub(crate) const KW_LIT_NULL:  &'static [u8] = b"null";
 

	
 
// Keywords - function(like)s
 
pub(crate) const KW_CAST:        &'static [u8] = b"cast";
 
pub(crate) const KW_FUNC_GET:    &'static [u8] = b"get";
 
pub(crate) const KW_FUNC_PUT:    &'static [u8] = b"put";
 
pub(crate) const KW_FUNC_FIRES:  &'static [u8] = b"fires";
 
pub(crate) const KW_FUNC_CREATE: &'static [u8] = b"create";
 
pub(crate) const KW_FUNC_LENGTH: &'static [u8] = b"length";
 
pub(crate) const KW_FUNC_ASSERT: &'static [u8] = b"assert";
 
pub(crate) const KW_FUNC_PRINT:  &'static [u8] = b"print";
 

	
 
// Keywords - statements
 
pub(crate) const KW_STMT_CHANNEL:  &'static [u8] = b"channel";
 
pub(crate) const KW_STMT_IF:       &'static [u8] = b"if";
 
pub(crate) const KW_STMT_ELSE:     &'static [u8] = b"else";
 
pub(crate) const KW_STMT_WHILE:    &'static [u8] = b"while";
 
pub(crate) const KW_STMT_BREAK:    &'static [u8] = b"break";
 
pub(crate) const KW_STMT_CONTINUE: &'static [u8] = b"continue";
 
pub(crate) const KW_STMT_GOTO:     &'static [u8] = b"goto";
 
pub(crate) const KW_STMT_RETURN:   &'static [u8] = b"return";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"synchronous";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"sync";
 
pub(crate) const KW_STMT_FORK:     &'static [u8] = b"fork";
 
pub(crate) const KW_STMT_OR:       &'static [u8] = b"or";
 
pub(crate) const KW_STMT_NEW:      &'static [u8] = b"new";
 

	
 
// Keywords - types
 
// Since types are needed for returning diagnostic information to the user, the
 
// string variants are put here as well.
 
pub(crate) const KW_TYPE_IN_PORT_STR:  &'static str = "in";
 
pub(crate) const KW_TYPE_OUT_PORT_STR: &'static str = "out";
 
pub(crate) const KW_TYPE_MESSAGE_STR:  &'static str = "msg";
 
pub(crate) const KW_TYPE_BOOL_STR:     &'static str = "bool";
 
pub(crate) const KW_TYPE_UINT8_STR:    &'static str = "u8";
 
pub(crate) const KW_TYPE_UINT16_STR:   &'static str = "u16";
 
pub(crate) const KW_TYPE_UINT32_STR:   &'static str = "u32";
 
pub(crate) const KW_TYPE_UINT64_STR:   &'static str = "u64";
 
pub(crate) const KW_TYPE_SINT8_STR:    &'static str = "s8";
 
pub(crate) const KW_TYPE_SINT16_STR:   &'static str = "s16";
 
pub(crate) const KW_TYPE_SINT32_STR:   &'static str = "s32";
 
pub(crate) const KW_TYPE_SINT64_STR:   &'static str = "s64";
 
pub(crate) const KW_TYPE_CHAR_STR:     &'static str = "char";
 
pub(crate) const KW_TYPE_STRING_STR:   &'static str = "string";
 
pub(crate) const KW_TYPE_INFERRED_STR: &'static str = "auto";
 

	
 
pub(crate) const KW_TYPE_IN_PORT:  &'static [u8] = KW_TYPE_IN_PORT_STR.as_bytes();
 
pub(crate) const KW_TYPE_OUT_PORT: &'static [u8] = KW_TYPE_OUT_PORT_STR.as_bytes();
 
pub(crate) const KW_TYPE_MESSAGE:  &'static [u8] = KW_TYPE_MESSAGE_STR.as_bytes();
 
@@ -506,49 +508,49 @@ pub(crate) fn consume_ident<'a>(
 
    Ok((ident, span))
 
}
 

	
 
/// Consumes an identifier and immediately intern it into the `StringPool`
 
pub(crate) fn consume_ident_interned(
 
    source: &InputSource, iter: &mut TokenIter, ctx: &mut PassCtx
 
) -> Result<Identifier, ParseError> {
 
    let (value, span) = consume_ident(source, iter)?;
 
    let value = ctx.pool.intern(value);
 
    Ok(Identifier{ span, value })
 
}
 

	
 
fn is_reserved_definition_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_STRUCT | KW_ENUM | KW_UNION | KW_FUNCTION | KW_PRIMITIVE | KW_COMPOSITE => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_statement_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_IMPORT | KW_AS |
 
        KW_STMT_CHANNEL | KW_STMT_IF | KW_STMT_WHILE |
 
        KW_STMT_BREAK | KW_STMT_CONTINUE | KW_STMT_GOTO | KW_STMT_RETURN |
 
        KW_STMT_SYNC | KW_STMT_NEW => true,
 
        KW_STMT_SYNC | KW_STMT_FORK | KW_STMT_NEW => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_LET | KW_CAST |
 
        KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL |
 
        KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_type_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_TYPE_IN_PORT | KW_TYPE_OUT_PORT | KW_TYPE_MESSAGE | KW_TYPE_BOOL |
 
        KW_TYPE_UINT8 | KW_TYPE_UINT16 | KW_TYPE_UINT32 | KW_TYPE_UINT64 |
 
        KW_TYPE_SINT8 | KW_TYPE_SINT16 | KW_TYPE_SINT32 | KW_TYPE_SINT64 |
 
        KW_TYPE_CHAR | KW_TYPE_STRING |
 
        KW_TYPE_INFERRED => true,
 
        _ => false,
 
    }
 
}
 

	
src/protocol/parser/visitor.rs
Show inline comments
 
@@ -112,90 +112,96 @@ pub(crate) trait Visitor {
 
            },
 
            Statement::If(stmt) => {
 
                let this = stmt.this;
 
                self.visit_if_stmt(ctx, this)
 
            },
 
            Statement::EndIf(_stmt) => Ok(()),
 
            Statement::While(stmt) => {
 
                let this = stmt.this;
 
                self.visit_while_stmt(ctx, this)
 
            },
 
            Statement::EndWhile(_stmt) => Ok(()),
 
            Statement::Break(stmt) => {
 
                let this = stmt.this;
 
                self.visit_break_stmt(ctx, this)
 
            },
 
            Statement::Continue(stmt) => {
 
                let this = stmt.this;
 
                self.visit_continue_stmt(ctx, this)
 
            },
 
            Statement::Synchronous(stmt) => {
 
                let this = stmt.this;
 
                self.visit_synchronous_stmt(ctx, this)
 
            },
 
            Statement::EndSynchronous(_stmt) => Ok(()),
 
            Statement::Fork(stmt) => {
 
                let this = stmt.this;
 
                self.visit_fork_stmt(ctx, this)
 
            },
 
            Statement::EndFork(_stmt) => Ok(()),
 
            Statement::Return(stmt) => {
 
                let this = stmt.this;
 
                self.visit_return_stmt(ctx, this)
 
            },
 
            Statement::Goto(stmt) => {
 
                let this = stmt.this;
 
                self.visit_goto_stmt(ctx, this)
 
            },
 
            Statement::New(stmt) => {
 
                let this = stmt.this;
 
                self.visit_new_stmt(ctx, this)
 
            },
 
            Statement::Expression(stmt) => {
 
                let this = stmt.this;
 
                self.visit_expr_stmt(ctx, this)
 
            }
 
        }
 
    }
 

	
 
    fn visit_local_stmt(&mut self, ctx: &mut Ctx, id: LocalStatementId) -> VisitorResult {
 
        match &ctx.heap[id] {
 
            LocalStatement::Channel(stmt) => {
 
                let this = stmt.this;
 
                self.visit_local_channel_stmt(ctx, this)
 
            },
 
            LocalStatement::Memory(stmt) => {
 
                let this = stmt.this;
 
                self.visit_local_memory_stmt(ctx, this)
 
            },
 
        }
 
    }
 

	
 
    // --- enum variant handling
 
    fn visit_block_stmt(&mut self, _ctx: &mut Ctx, _id: BlockStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_local_memory_stmt(&mut self, _ctx: &mut Ctx, _id: MemoryStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_local_channel_stmt(&mut self, _ctx: &mut Ctx, _id: ChannelStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_labeled_stmt(&mut self, _ctx: &mut Ctx, _id: LabeledStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_if_stmt(&mut self, _ctx: &mut Ctx, _id: IfStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_while_stmt(&mut self, _ctx: &mut Ctx, _id: WhileStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_break_stmt(&mut self, _ctx: &mut Ctx, _id: BreakStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_continue_stmt(&mut self, _ctx: &mut Ctx, _id: ContinueStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_synchronous_stmt(&mut self, _ctx: &mut Ctx, _id: SynchronousStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_fork_stmt(&mut self, _ctx: &mut Ctx, _id: ForkStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_return_stmt(&mut self, _ctx: &mut Ctx, _id: ReturnStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_goto_stmt(&mut self, _ctx: &mut Ctx, _id: GotoStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_new_stmt(&mut self, _ctx: &mut Ctx, _id: NewStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_expr_stmt(&mut self, _ctx: &mut Ctx, _id: ExpressionStatementId) -> VisitorResult { Ok(()) }
 

	
 
    // Expressions
 
    // --- enum matching
 
    fn visit_expr(&mut self, ctx: &mut Ctx, id: ExpressionId) -> VisitorResult {
 
        match &ctx.heap[id] {
 
            Expression::Assignment(expr) => {
 
                let this = expr.this;
 
                self.visit_assignment_expr(ctx, this)
 
            },
 
            Expression::Binding(expr) => {
 
                let this = expr.this;
 
                self.visit_binding_expr(ctx, this)
 
            }
 
            Expression::Conditional(expr) => {
 
                let this = expr.this;
 
                self.visit_conditional_expr(ctx, this)
 
            }
 
            Expression::Binary(expr) => {
 
                let this = expr.this;
 
                self.visit_binary_expr(ctx, this)
src/runtime/tests.rs
Show inline comments
 
@@ -16,63 +16,63 @@ const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir");
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).expect("Failed to create log output file!");
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, pd, connector_id)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive sync(in<msg> a, out<msg> b) {
 
primitive sync_component(in<msg> a, out<msg> b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert(!fires(a) && !fires(b));
 
            }
 
        }
 
    }
 
}
 

	
 
primitive together(in<msg> ia, in<msg> ib, out<msg> oa, out<msg> ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 
fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
    let mut v = Vec::with_capacity(cap);
 
    // Safe! len will cover owned bytes in valid state
 
    unsafe { v.set_len(cap) }
 
    v
 
}
 
@@ -81,49 +81,49 @@ fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[i, o]).unwrap();
 
    c.add_component(b"", b"sync_component", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
@@ -332,65 +332,65 @@ fn native_immediately_inconsistent() {
 
    c.sync(SEC15).unwrap_err();
 
}
 

	
 
#[test]
 
fn native_recovers() {
 
    let test_log_path = Path::new("./logs/native_recovers");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap_err();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(SEC15).unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g, p]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g0, p1]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g0, p1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let test_log_path = Path::new("./logs/double_net_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_p, _g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [_g, _p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
@@ -400,49 +400,49 @@ fn double_net_connect() {
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn distributed_msg_bounce() {
 
    /*
 
    native[0] | sync 0.p|-->|1.p native[1]
 
                     0.g|<--|1.g
 
    */
 
    let test_log_path = Path::new("./logs/distributed_msg_bounce");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            /*
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"", b"sync", &[g, p]).unwrap();
 
            c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(SEC1).unwrap();
 
            c.put(p, TEST_MSG.clone()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(SEC1).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
@@ -861,49 +861,49 @@ fn example_pres_3() {
 
}
 

	
 
#[test]
 
fn ac_not_b() {
 
    let test_log_path = Path::new("./logs/ac_not_b");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC5).unwrap();
 

	
 
            // put both A and B
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in<msg> a, in<msg> b, out<msg> c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
                sync { put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let [a, b] = c.new_port_pair();
 

	
 
            c.add_component(b"", b"ac_not_b", &[p0, p1, a]).unwrap();
 

	
 
            c.connect(SEC1).unwrap();
 

	
 
            c.get(b).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn many_rounds_net() {
 
    let test_log_path = Path::new("./logs/many_rounds_net");
 
    let sock_addrs = [next_test_addr()];
 
    const NUM_ROUNDS: usize = 1_000;
 
    scope(|s| {
 
@@ -925,202 +925,202 @@ fn many_rounds_net() {
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 
#[test]
 
fn many_rounds_mem() {
 
    let test_log_path = Path::new("./logs/many_rounds_mem");
 
    const NUM_ROUNDS: usize = 1_000;
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, p1] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    for _ in 0..NUM_ROUNDS {
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.get(p1).unwrap();
 
        c.sync(SEC1).unwrap();
 
    }
 
}
 

	
 
#[test]
 
fn pdl_reo_lossy() {
 
    let pdl = b"
 
    primitive lossy(in<msg> a, out<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg m = null;
 
            if(fires(a)) {
 
                m = get(a);
 
                if(fires(b)) {
 
                    put(b, m);
 
                }
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1() {
 
    let pdl = b"
 
    primitive fifo1(in<msg> a, out<msg> b) {
 
        msg m = null;
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
            } else {
 
                if(fires(b)) put(b, m);
 
                m = null;
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1full() {
 
    let test_log_path = Path::new("./logs/pdl_reo_fifo1full");
 
    let pdl = b"
 
    primitive fifo1full(in<msg> a, out<msg> b) {
 
        bool is_set = true;
 
        msg m = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(!is_set) {
 
                if(fires(a)) m=get(a);
 
                is_set = false;
 
            } else {
 
                if(fires(b)) put(b, m);
 
                is_set = true;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [_p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"fifo1full", &[g0, p1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(0, c.gotten(g1).unwrap().len());
 
}
 

	
 
#[test]
 
fn pdl_msg_consensus() {
 
    let test_log_path = Path::new("./logs/pdl_msg_consensus");
 
    let pdl = b"
 
    primitive msgconsensus(in<msg> a, in<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg x = get(a);
 
            msg y = get(b);
 
            assert(x == y);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"msgconsensus", &[g0, g1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap();
 

	
 
    c.put(p0, Payload::from(b"HEY" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap_err();
 
}
 

	
 
#[test]
 
fn sequencer3_prim() {
 
    let test_log_path = Path::new("./logs/sequencer3_prim");
 
    let pdl = b"
 
    primitive sequencer3(out<msg> a, out<msg> b, out<msg> c) {
 
        u32 i = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            out to = a;
 
            if     (i==1) to = b;
 
            else if(i==2) to = c;
 
            if(fires(to)) {
 
                put(to, create(0));
 
                i = (i + 1)%3;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let which_of_three = move |c: &mut Connector| {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        // silent round
 
        assert_eq!(0, c.sync(None).unwrap());
 
        // non silent round
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
#[test]
 
fn sequencer3_comp() {
 
    let test_log_path = Path::new("./logs/sequencer3_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 
    primitive fifo1_init<T>(bool has_value, T m, in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(has_value && fires(b)) {
 
                put(b, m);
 
                has_value = false;
 
            } else if (!has_value && fires(a)) {
 
                m = get(a);
 
                has_value = true;
 
            }
 
        }
 
    }
 
    composite fifo1_full<T>(in<T> a, out<T> b) {
 
        new fifo1_init(true, create(0), a, b);
 
    }
 
    composite fifo1<T>(in<T> a, out<T> b) {
 
        new fifo1_init(false, create(0), a, b);
 
    }
 
    composite sequencer3(out<msg> a, out<msg> b, out<msg> c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 

	
 
        new fifo1_full(o, d);
 
@@ -1160,125 +1160,125 @@ fn sequencer3_comp() {
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
enum XRouterItem {
 
    Silent,
 
    GetA,
 
    GetB,
 
}
 
// Hardcoded pseudo-random sequence of round behaviors for the native component
 
const XROUTER_ITEMS: &[XRouterItem] = {
 
    use XRouterItem::{GetA as A, GetB as B, Silent as S};
 
    &[
 
        B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B,
 
        S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A,
 
        A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S,
 
    ]
 
};
 

	
 
#[test]
 
fn xrouter_prim() {
 
    let test_log_path = Path::new("./logs/xrouter_prim");
 
    let pdl = b"
 
    primitive xrouter(in<msg> a, out<msg> b, out<msg> c) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                if(fires(b)) put(b, get(a));
 
                else         put(c, get(a));
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("PRIM {:?}", now.elapsed());
 
}
 
#[test]
 
fn xrouter_comp() {
 
    let test_log_path = Path::new("./logs/xrouter_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive merger(in<msg> a, in<msg> b, out<msg> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && !fires(b) && fires(c)) {
 
                    put(c, get(a));
 
                } else if (!fires(a) && fires(b) && fires(c)) {
 
                    put(c, get(b));
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive lossy<T>(in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                auto m = get(a);
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
    primitive sync_drain<T>(in<T> a, in<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                msg drop_it = get(a);
 
                msg on_the_floor = get(b);
 
            }
 
        }
 
    }
 
    composite xrouter(in<msg> a, out<msg> b, out<msg> c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 
        channel p -> q;
 
        channel r -> s;
 
        channel t -> u;
 

	
 
        new replicator(a, d, f);
 
        new replicator(g, t, h);
 
        new lossy(e, l);
 
        new lossy(i, j);
 
        new replicator(m, b, p);
 
        new replicator(k, n, c);
 
        new merger(q, o, r);
 
@@ -1299,108 +1299,108 @@ fn xrouter_comp() {
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("COMP {:?}", now.elapsed());
 
}
 

	
 
#[test]
 
fn count_stream() {
 
    let test_log_path = Path::new("./logs/count_stream");
 
    let pdl = b"
 
    primitive count_stream(out<msg> o) {
 
        msg m = create(1);
 
        m[0] = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            put(o, m);
 
            m[0] += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"", b"count_stream", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..16 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
}
 

	
 
#[test]
 
fn for_msg_byte() {
 
    let test_log_path = Path::new("./logs/for_msg_byte");
 
    let pdl = b"
 
    primitive for_msg_byte(out<msg> o) {
 
        u8 i = 0;
 
        u32 idx = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[idx] = i;
 
            synchronous put(o, m);
 
            sync put(o, m);
 
            i += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"", b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn eq_causality() {
 
    let test_log_path = Path::new("./logs/eq_causality");
 
    let pdl = b"
 
    primitive eq(in<msg> a, in<msg> b, out<msg> c) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                // left first!
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
@@ -1414,49 +1414,49 @@ fn eq_causality() {
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..4 {
 
        // everything is fine with LEFT FIRST
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap();
 

	
 
        // no solution when left is NOT FIRST
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap_err();
 
    }
 
}
 

	
 
#[test]
 
fn eq_no_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    composite eq(in<msg> a, in<msg> b, out<msg> c) {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in<msg> a, in<msg> b, out<msg> c, out<msg> leftfirsto, in<msg> leftfirsti) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                if(fires(leftfirsti)) {
 
                    // left first! DO USE DUMMY
 
                    ma = get(a);
 
                    put(c, ma);
 
                    mb = get(b);
 

	
 
                    // using dummy!
 
                    put(leftfirsto, ma);
 
                    auto drop_it = get(leftfirsti);
 
                } else {
 
                    // right first! DON'T USE DUMMY
 
                    mb = get(b);
 
                    put(c, mb);
 
                    ma = get(a);
 
                }
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
src/runtime2/branch.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::{Value, ValueGroup};
 

	
 
use super::port::PortIdLocal;
 

	
 
// To share some logic between the FakeTree and ExecTree implementation
 
trait BranchListItem {
 
    fn get_id(&self) -> BranchId;
 
    fn set_next_id(&mut self, id: BranchId);
 
    fn get_next_id(&self) -> BranchId;
 
}
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
}
 

	
 
impl BranchId {
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
 
    #[inline]
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        return Self{ index };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum PreparedStatement {
 
    CreatedChannel((Value, Value)),
 
    ForkedExecution(bool),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl PreparedStatement {
 
    pub(crate) fn is_none(&self) -> bool {
 
        if let PreparedStatement::None = self {
 
            return true;
 
        } else {
 
            return false;
 
        }
 
    }
 

	
 
    pub(crate) fn take(&mut self) -> PreparedStatement {
 
        if let PreparedStatement::None = self {
 
            return PreparedStatement::None;
 
        } else {
 
            let mut replacement = PreparedStatement::None;
 
            std::mem::swap(self, &mut replacement);
 
            return replacement;
 
        }
 
    }
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
 
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
 
    pub prepared: PreparedStatement,
 
}
 

	
 
impl BranchListItem for Branch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
            prepared_channel: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 
        // debug_assert!(
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Inserts a message into the branch for retrieval by a corresponding
 
    /// `get(port)` call.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
#[derive(Copy, Clone)]
 
struct BranchQueue {
 
    first: BranchId,
 
    last: BranchId,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{
 
            first: BranchId::new_invalid(),
 
            last: BranchId::new_invalid()
 
        }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!(self.first.is_valid() == self.last.is_valid());
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
impl QueueKind {
 
    fn as_index(&self) -> usize {
 
        return match self {
 
            QueueKind::Runnable => 0,
 
            QueueKind::AwaitingMessage => 1,
 
            QueueKind::FinishedSync => 2,
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ExecTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
/// contains branches that ended up in a particular execution state. The lists
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: ComponentState) -> Self {
 
        return Self {
 
            branches: vec![Branch::new_non_sync(component)],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    // --- Generic branch (queue) management
 

	
 
    /// Returns if tree is in speculative mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return self.branches.len() != 1;
 
    }
 

	
 
    /// Returns true if the particular queue is empty
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            return None;
 
        } else {
 
            let first_branch = &mut self.branches[queue.first.index as usize];
 
            queue.first = first_branch.next_in_queue;
 
            first_branch.next_in_queue = BranchId::new_invalid();
 
            if !queue.first.is_valid() {
 
                queue.last = BranchId::new_invalid();
 
            }
 

	
 
            return Some(first_branch.id);
 
        }
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    /// Pushes a branch (ID) into a queue.
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            queue.first = id;
 
            queue.last = id;
 
        } else {
 
            let last_branch = &mut self.branches[queue.last.index as usize];
 
            last_branch.next_in_queue = id;
 
            queue.last = id;
 
        }
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    /// Returns the non-sync branch (TODO: better name?)
 
    pub fn base_branch_mut(&mut self) -> &mut Branch {
 
        debug_assert!(!self.is_in_sync());
 
        return &mut self.branches[0];
 
    }
 

	
 
    /// Returns an iterator over all the elements in the queue of the given
 
    /// kind. One can start the iteration at the branch *after* the provided
 
    /// branch. Just make sure it actually is in the provided queue.
 
    pub fn iter_queue(&self, kind: QueueKind, start_at: Option<BranchId>) -> BranchQueueIter {
 
    /// Returns the branch ID of the first branch in a particular queue.
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
        let index = match start_at {
 
            Some(branch_id) => {
 
                debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id));
 
                let branch = &self.branches[branch_id.index as usize];
 

	
 
                branch.next_in_queue.index as usize
 
            },
 
            None => {
 
                queue.first.index as usize
 
            }
 
        };
 

	
 
        return BranchQueueIter {
 
            branches: self.branches.as_slice(),
 
            index,
 
    /// Returns the next branch ID of a branch (assumed to be in a particular
 
    /// queue.
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    /// Returns an iterator that starts with the provided branch, and then
 
    /// continues to visit all of the branch's parents.
 
    pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter {
 
        return BranchParentIter{
 
            branches: self.branches.as_slice(),
 
            index: branch_id.index as usize,
 
        }
 
    }
 

	
 
    // --- Preparing and finishing a speculative round
 

	
 
    /// Starts a synchronous round by cloning the non-sync branch and marking it
 
    /// as the root of the speculative tree. The id of this root sync branch is
 
    /// returned.
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    /// Creates a new speculative branch based on the provided one. The index to
 
    /// retrieve this new branch will be returned.
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    /// Collapses the speculative execution tree back into a deterministic one,
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id));
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1);
 

	
 
        // Reset all values to non-sync defaults
 
        let branch = &mut self.branches[0];
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        debug_assert!(!branch.awaiting_port.is_valid());
 
        branch.next_in_queue = BranchId::new_invalid();
 
        branch.inbox.clear();
 
        debug_assert!(branch.prepared_channel.is_none());
 
        debug_assert!(branch.prepared.is_none());
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
 
            self.queues[queue_idx] = BranchQueue::new();
 
        }
 
    }
 
}
 

	
 
impl Index<BranchId> for ExecTree {
 
    type Output = Branch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
pub(crate) struct BranchQueueIter<'a> {
 
/// Iterator over the parents of an `ExecTree` branch.
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchQueueIter<'a> {
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            // i.e. the invalid branch index
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.next_in_queue.index as usize;
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    }
 
}
 

	
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
// -----------------------------------------------------------------------------
 
// FakeTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Generic fake branch. This is supposed to be used in conjunction with the
 
/// fake tree. The purpose is to have a branching-like tree to use in
 
/// combination with a consensus algorithm in places where we don't have PDL
 
/// code.
 
pub(crate) struct FakeBranch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal,
 
    pub next_in_queue: BranchId,
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>,
 
}
 

	
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
 
impl BranchListItem for FakeBranch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
impl FakeBranch {
 
    fn new_root(_index: u32) -> FakeBranch {
 
        debug_assert!(_index == 1);
 
        return FakeBranch{
 
            id: BranchId::new(1),
 
            parent_id: BranchId::new_invalid(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
        }
 
    }
 

	
 
    fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch {
 
        return FakeBranch {
 
            id: BranchId::new(index),
 
            parent_id: parent_branch.id,
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// A little helper for native components that don't have a set of branches that
 
/// are actually executing code, but just have to manage the idea of branches
 
/// due to them performing the equivalent of a branching `get` call.
 
pub(crate) struct FakeTree {
 
    pub branches: Vec<FakeBranch>,
 
    queues: [BranchQueue; NUM_QUEUES],
 
}
 

	
 
impl FakeTree {
 
    pub fn new() -> Self {
 
        // TODO: Don't like this? Cause is that now we don't have a non-sync
 
        //  branch. But we assumed BranchId=0 means the branch is invalid. We
 
        //  can do the rusty Option<BranchId> stuff. But we still need a token
 
        //  value within the protocol to signify no-branch-id. Maybe the high
 
        //  bit? Branches are crazy expensive, no-one is going to have 2^32
 
        //  branches anyway. 2^31 isn't too bad.
 
        return Self {
 
            branches: vec![FakeBranch{
 
                id: BranchId::new_invalid(),
 
                parent_id: BranchId::new_invalid(),
 
                sync_state: SpeculativeState::RunningNonSync,
 
                awaiting_port: PortIdLocal::new_invalid(),
 
                next_in_queue: BranchId::new_invalid(),
 
                inbox: HashMap::new(),
 
            }],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    fn is_in_sync(&self) -> bool {
 
        return self.branches.len() > 1;
 
    }
 

	
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync);
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first)
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 

	
 
        // Create the first branch
 
        let sync_branch = FakeBranch::new_root(1);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch {
 
        debug_assert!(branch_id.is_valid());
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Take out the succeeding branch, then just clear all fake branches.
 
        self.branches.swap(1, branch_id.index as usize);
 
        self.branches.truncate(2);
 
        let result = self.branches.pop().unwrap();
 

	
 
        for queue_index in 0..NUM_QUEUES {
 
            self.queues[queue_index] = BranchQueue::new();
 
        }
 

	
 
        return result;
 
    }
 
}
 

	
 
impl Index<BranchId> for FakeTree {
 
    type Output = FakeBranch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for FakeTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Shared logic
 
// -----------------------------------------------------------------------------
 

	
 
fn pop_from_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B]) -> Option<BranchId> {
 
    if queue.is_empty() {
 
        return None;
 
    } else {
 
        let first_branch = &mut branches[queue.first.index as usize];
 
        queue.first = first_branch.get_next_id();
 
        first_branch.set_next_id(BranchId::new_invalid());
 
        if !queue.first.is_valid() {
 
            queue.last = BranchId::new_invalid();
 
        }
 

	
 
        return Some(first_branch.get_id());
 
    }
 
}
 

	
 
fn push_into_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) {
 
    debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid());
 
    if queue.is_empty() {
 
        queue.first = branch_id;
 
        queue.last = branch_id;
 
    } else {
 
        let last_branch = &mut branches[queue.last.index as usize];
 
        last_branch.set_next_id(branch_id);
 
        queue.last = branch_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/connector.rs
Show inline comments
 
@@ -12,374 +12,413 @@
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::PortId;
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
#[derive(Debug, Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
    Exit,           // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.registered_id.is_some();
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(data) => Some(data.clone()),
 
            None => None,
 
        }
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
            // Run in sync mode
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
 
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                return ConnectorScheduling::Immediate;
 
            } else {
 
                return scheduling
 

	
 
            // Handle any new finished branches
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 

	
 
                if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Actually found a solution
 
                    self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 

	
 
                self.last_finished_handled = Some(branch_id);
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: ComponentState) -> Self {
 
        Self{
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
 
                Message::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        let mut branches = Vec::new(); // TODO: @Remove
 
        if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) {
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        for branch_id in branches.drain(..) {
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
            RunResult::BranchGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `get()` is valid, so mark the branch as awaiting a message
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    // Note: we only know that a branch is waiting on a message when
 
                    // it reaches the `get` call. But we might have already received
 
                    // a message that targets this branch, so check now.
 
                    let mut any_branch_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_branch_received = true;
 
                        }
 
                    }
 

	
 
                    if any_branch_received {
 
                        return ConnectorScheduling::Immediate;
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            RunResult::BranchAtSyncEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else if consistency == Consistency::Inconsistent {
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            RunResult::BranchPut(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `put()` is valid.
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    comp_ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header, data_header,
 
                        content: DataContent::Message(content),
 
                    }));
 

	
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                }));
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_state = ComponentState {
 
                    prompt: Prompt::new(
 
                        &sched_ctx.runtime.protocol_description.types,
 
                        &sched_ctx.runtime.protocol_description.heap,
 
                        definition_id, monomorph_idx, arguments
 
                    ),
 
                };
 
                let new_component = ConnectorPDL::new(new_state);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared_channel = Some((
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
 
        let mut fake_vec = Vec::new();
 
        self.tree.end_sync(solution_branch_id);
 
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            // TODO: Handle sent/received ports
 
            debug_assert!(ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        ctx.notify_sync_end(&[]);
 
        self.last_finished_handled = None;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind};
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal};
 
use super::inbox::{
 
    Message, PortAnnotation,
 
    DataMessage, DataContent, DataHeader,
 
    SyncMessage, SyncContent, SyncHeader,
 
};
 
use super::scheduler::ComponentCtx;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ctx.get_ports().iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: v.self_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            port_mapping: parent_branch_annotations.port_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.port_mapping {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
                        // Inconsistent speculative state and actual state
 
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
 
                        return Consistency::Inconsistent;
 
                    }
 
                },
 
                None => {},
 
            }
 
        }
 

	
 
        return Consistency::Valid;
 
    }
 

	
 
@@ -166,302 +173,276 @@ impl Consensus {
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == port_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates sync messages for any branches that are at the end of the
 
    /// sync block. To find these branches, they should've been put in the
 
    /// "finished" queue in the execution tree.
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let mut last_branch_id = self.last_finished_handled;
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                // Note: if the port is silent, and we've never communicated
 
                // over the port, then we need to do so now, to let the peer
 
                // component know about our sync leader state.
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                let peer_port_id = port_desc.peer_id;
 
                let channel_id = port_desc.channel_id;
 

	
 
                if !self.encountered_ports.contains(&port.port_id) {
 
                    ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header: SyncHeader{
 
                            sending_component_id: ctx.id,
 
                            highest_component_id: self.highest_connector_id,
 
                            sync_round: self.sync_round
 
                        },
 
                        data_header: DataHeader{
 
                            expected_mapping: source_mapping.clone(),
 
                            sending_port: port.port_id,
 
                            target_port: peer_port_id,
 
                            new_mapping: BranchId::new_invalid(),
 
                        },
 
                        content: DataContent::SilentPortNotification,
 
                    }));
 
                    self.encountered_ports.push(port.port_id);
 
                }
 

	
 
                target_mapping.push((
 
                    channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 
            let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
            if solution_branch.is_some() {
 
                // No need to continue iterating, we've found the solution
 
                return solution_branch;
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&port.port_id) {
 
                ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    data_header: DataHeader{
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: port.port_id,
 
                        target_port: peer_port_id,
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(port.port_id);
 
            }
 

	
 
            last_branch_id = Some(branch.id);
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
        return None;
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
        return solution_branch;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory.
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        final_ports.clear();
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for port in &branch.port_mapping {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.last_finished_handled = None;
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 

	
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the data and sync header, and
 
    /// checking which *existing* branches *can* receive the message. So two
 
    /// cautionary notes:
 
    /// 1. A future branch might also be able to receive this message, see the
 
    ///     `branch_can_receive` function.
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec<BranchId>) -> bool {
 
        self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids);
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool {
 
        return self.handle_received_sync_header(&message.sync_header, ctx)
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return None;
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
                return None;
 
            },
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                return self.send_or_store_local_solution(solution, ctx);
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(connector_id, _)| *connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) {
 
        debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content));
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
            if mapping.port_id == message.data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports);
 
                find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) {
 
            if sync_header.sync_round < peer.expected_sync_round {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        if let DataContent::SilentPortNotification = content {
 
        if let DataContent::SilentPortNotification = message.content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, sync_header, data_header, content) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        if !self.handle_peer(sync_header) {
 
            // We can drop this package
 
            return false;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(Message::Sync(message));
 
            }
 
@@ -557,49 +538,49 @@ impl Consensus {
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        for local_solution in self.solution_combiner.drain() {
 
            let message = SyncMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(local_solution),
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
src/runtime2/inbox.rs
Show inline comments
 
use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::consensus::{GlobalSolution, LocalSolution};
 
use super::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
    pub registered_id: Option<BranchId>,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
 
/// unique identifier for a branch, but differs in that a branch only has one
 
/// branch ID, but might have multiple associated markers (i.e. one branch
 
/// performing a `put` three times will generate three markers.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchMarker{
 
    marker: u32,
 
}
 

	
 
impl BranchMarker {
 
    #[inline]
 
    pub(crate) fn new(marker: u32) -> Self {
 
        debug_assert!(marker != 0);
 
        return Self{ marker };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ marker: 0 }
 
    }
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
    pub sending_component_id: ConnectorId,
 
    pub highest_component_id: ConnectorId,
 
    pub sync_round: u32,
 
}
 

	
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchId,
 
    pub new_mapping: BranchMarker,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
 
//  data message was neat because "silent port notification" should be rerouted
 
//  like any other data message to determine the component ID of the receiver
 
//  and to make it part of the leader election algorithm for the sync leader.
 
//  However: it complicates logic quite a bit. Really it might be easier to
 
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
 
#[derive(Debug, Clone)]
 
pub(crate) enum DataContent {
 
    SilentPortNotification,
 
    Message(ValueGroup),
 
}
 

	
 
impl DataContent {
 
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
 
        match self {
 
            DataContent::SilentPortNotification => None,
 
            DataContent::Message(message) => Some(message),
 
        }
 
    }
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::find_ports_in_value_group;
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{Message, ControlContent, ControlMessage};
 
use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
type SyncDone = Arc<(Mutex<Option<FinishedSync>>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    SyncRound(Vec<ApplicationSyncAction>),
 
    Shutdown,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorApplication
 
// -----------------------------------------------------------------------------
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
// TODO: Strong candidate for logic reduction in handling put/get. A lot of code
 
//  is an approximate copy-pasta from the regular component logic. I'm going to
 
//  wait until I'm implementing more native components to see which logic is
 
//  truly common.
 
pub struct ConnectorApplication {
 
    // Communicating about new jobs and setting up sync rounds
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    is_in_sync: bool,
 
    // Handling current sync round
 
    sync_desc: Vec<ApplicationSyncAction>,
 
    tree: FakeTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
    branch_extra: Vec<usize>, // instruction counter per branch
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if self.is_in_sync {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_solution_branch(solution_branch, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            return self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let sync_done = Arc::new(( Mutex::new(None), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone()
 
            job_queue: job_queue.clone(),
 
            is_in_sync: false,
 
            sync_desc: Vec::new(),
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Handle any incoming messages if we're participating in a round
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(_) => todo!("data message in API connector"),
 
                Message::Sync(_)  => todo!("sync message in API connector"),
 
                Message::Control(_) => todo!("impossible control message"),
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
    }
 

	
 
        // Handle requests coming from the API
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            while let Some(job) = queue.pop_front() {
 
                match job {
 
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                        comp_ctx.push_port(endpoint_a);
 
                        comp_ctx.push_port(endpoint_b);
 
    pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.handle_new_messages(comp_ctx);
 

	
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 
        let mut instruction_idx = self.branch_extra[branch_id.index as usize];
 

	
 
        if instruction_idx >= self.sync_desc.len() {
 
            // Performed last instruction, so this branch is officially at the
 
            // end of the synchronous interaction.
 
            let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
            if consistency == Consistency::Valid {
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
            } else {
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            }
 
        } else {
 
            // We still have instructions to perform
 
            let cur_instruction = &self.sync_desc[instruction_idx];
 
            self.branch_extra[branch_id.index as usize] += 1;
 

	
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 

	
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: DataContent::Message(content.clone()),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 

	
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    }
 
                    ApplicationJob::NewConnector(connector, initial_ports) => {
 
                        comp_ctx.push_component(connector, initial_ports);
 
                    },
 
                    ApplicationJob::Shutdown => {
 
                        debug_assert!(queue.is_empty());
 
                        return ConnectorScheduling::Exit;
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) {
 
        debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            debug_assert!(comp_ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
        // Turning hashmapped inbox into vector of values
 
        let mut inbox = Vec::with_capacity(solution_branch.inbox.len());
 
        for action in &self.sync_desc {
 
            match action {
 
                ApplicationSyncAction::Put(_, _) => {},
 
                ApplicationSyncAction::Get(port_id) => {
 
                    debug_assert!(solution_branch.inbox.contains_key(port_id));
 
                    inbox.push(solution_branch.inbox.remove(port_id).unwrap());
 
                },
 
            }
 
        }
 

	
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ inbox });
 
        notification.notify_one();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ApplicationInterface
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub enum ChannelCreationError {
 
    InSync,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationStartSyncError {
 
    AlreadyInSync,
 
    NoSyncActions,
 
    IncorrectPortKind,
 
    UnownedPort,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
    Put(PortIdLocal, ValueGroup),
 
    Get(PortIdLocal),
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    is_in_sync: bool,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
    owned_ports: Vec<(PortKind, PortIdLocal)>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            is_in_sync: false,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
    /// Creates a new channel. Can only fail if the application interface is
 
    /// currently in sync mode.
 
    pub fn create_channel(&mut self) -> Result<Channel, ChannelCreationError> {
 
        if self.is_in_sync {
 
            return Err(ChannelCreationError::InSync);
 
        }
 

	
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 
        self.owned_ports.push((PortKind::Putter, putter_id));
 
        self.owned_ports.push((PortKind::Getter, getter_id));
 

	
 
        return Channel{ putter_id, getter_id };
 
        return Ok(Channel{ putter_id, getter_id });
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Yank out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        if self.is_in_sync {
 
            return Err(ComponentCreationError::InSync);
 
        }
 

	
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|v| v == initial_port) {
 
            if !self.owned_ports.iter().any(|(_, v)| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
 
            let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(state);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    /// Queues up a description of a synchronous round to run. Will not actually
 
    /// run the synchronous behaviour in blocking fashion. The results *must* be
 
    /// retrieved using `try_wait` or `wait` for the interface to be considered
 
    /// in non-sync mode.
 
    // TODO: Maybe change API in the future. For now it does the job
 
    pub fn perform_sync_round(&mut self, actions: Vec<ApplicationSyncAction>) -> Result<(), ApplicationStartSyncError> {
 
        if self.is_in_sync {
 
            return Err(ApplicationStartSyncError::AlreadyInSync);
 
        }
 

	
 
        // Check the action ports for consistency
 
        for action in &actions {
 
            let (port_id, expected_kind) = match action {
 
                ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter),
 
                ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter),
 
            };
 

	
 
            match self.find_port_by_id(port_id) {
 
                Some(port_kind) => {
 
                    if port_kind != expected_kind {
 
                        return Err(ApplicationStartSyncError::IncorrectPortKind)
 
                    }
 
                },
 
                None => {
 
                    return Err(ApplicationStartSyncError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        // Everything is consistent, go into sync mode and send the actions off
 
        // to the component that will actually perform the sync round
 
        self.is_in_sync = true;
 
        {
 
            let (is_done, _) = &*self.sync_done;
 
            let mut lock = is_done.lock().unwrap();
 
            *lock = None;
 
        }
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::SyncRound(actions));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        return Ok(())
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
    /// Wait until the next sync-round is finished, returning the received
 
    /// messages in order of `get` calls.
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 

	
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        return Ok(lock.take().unwrap().inbox);
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let connector = self.runtime.get_component_public(self.connector_id);
 
        connector.inbox.insert_message(Message::Control(ControlMessage {
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ping,
 
        }));
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        }
 
    }
 

	
 
    fn find_port_by_id(&self, port_id: PortIdLocal) -> Option<PortKind> {
 
        return self.owned_ports.iter()
 
            .find(|(_, owned_id)| *owned_id == port_id)
 
            .map(|(port_kind, _)| *port_kind);
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::Shutdown);
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        self.runtime.decrement_active_interfaces();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -275,50 +275,51 @@ impl Scheduler {
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx.inbox_messages.clear();
 
                // Just left sync region. So clear inbox up until the last
 
                // message that was read.
 
                scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read);
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
@@ -359,49 +360,49 @@ impl Scheduler {
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<Message>, // never control or ping messages
 
    inbox_messages: Vec<Message>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox_messages: Vec::new(),
 
            inbox_len_read: 0,
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
src/runtime2/tests/api_component.rs
Show inline comments
 
new file 100644
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_getting_from_component() {
 
    const CODE: &'static str ="
 
    primitive loop_sender(out<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            sync {
 
                put(numbers, cur);
 
                cur += 1;
 
            }
 
        }
 
    }";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_sender", ValueGroup::new_stack(vec![
 
        Value::Output(PortId::new(channel.putter_id.index)),
 
        Value::UInt32(1337),
 
        Value::UInt32(1337 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Get(channel.getter_id),
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 

	
 
        assert!(result.len() == 1 && result[0].values.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, 1337 + loop_idx);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_putting_to_component() {
 
    const CODE: &'static str = "
 
    primitive loop_receiver(in<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            sync {
 
                auto number = get(numbers);
 
                assert(number == cur);
 
                cur += 1;
 
            }
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_receiver", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(42),
 
        Value::UInt32(42 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::UInt32(42 + loop_idx)])),
 
        ]).expect("start sync round");
 

	
 
        // Note: if we finish a round, then it must have succeeded :)
 
        api.wait().expect("finish sync round");
 
    }
 
}
 

	
 
#[test]
 
fn test_doing_nothing() {
 
    const CODE: &'static str = "
 
    primitive getter(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {}
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for _ in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![]).expect("start silent sync round");
 
        api.wait().expect("finish silent sync round");
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::Bool(true)]))
 
        ]).expect("start firing sync round");
 
        let res = api.wait().expect("finish firing sync round");
 
        assert!(res.is_empty());
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/basics.rs
Show inline comments
 
new file 100644
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_single_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("single_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_multi_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter_static(out<u8> vals, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                put(vals, 0b00000001);
 
                put(vals, 0b00000100);
 
                put(vals, 0b00010000);
 
                put(vals, 0b01000000);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter_dynamic(in<u8> vals, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                u32 recv_index = 0;
 
                u8 expected = 1;
 
                while (recv_index < 4) {
 
                    auto gotten = get(vals);
 
                    assert(gotten == expected);
 
                    expected <<= 2;
 
                    recv_index += 1;
 
                }
 
            }
 
            loop_index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("multi_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "putter_static", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "getter_dynamic", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
    })
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation_basic;
 
mod basics;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 5;   // number of test instances constructed
 
const NUM_LOOPS: u32 = 5;       // number of loops within a single test (not used by all tests)
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
struct TestTimer {
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    fn new(name: &'static str) -> Self {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            synchronous {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ]));
 
    });
 
}
 
\ No newline at end of file
src/runtime2/tests/network_shapes.rs
Show inline comments
 
new file 100644
 
// Testing particular graph shapes
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            sync {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create connector");
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            sync {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create connector");
 
    });
 
}
 
\ No newline at end of file
src/runtime2/tests/speculation_basic.rs
Show inline comments
 
new file 100644
 
// Testing speculation - Basic forms
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_maybe_do_nothing() {
 
    // Three variants in which the behaviour in which nothing is performed is
 
    // somehow not allowed. Note that we "check" by seeing if the test finishes.
 
    // Only the branches in which ports fire increment the loop index
 
    const CODE: &'static str = "
 
    primitive only_puts(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { put(output, true); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_put(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                fork { put(output, true); index += 1; }
 
                or   {}
 
            }
 
        }
 
    }
 

	
 
    primitive only_gets(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_get(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync fork { auto res = get(input); assert(res); index += 1; } or {}
 
        }
 
    }
 
    ";
 

	
 
    // Construct all variants which should work and wait until the runtime exits
 
    run_test_in_runtime(CODE, |api| {
 
        // only putting -> maybe getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "only_puts", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "might_get", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 

	
 
        // maybe putting -> only getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "might_put", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "only_gets", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
    })
 
}
 
\ No newline at end of file
testdata/parser/negative/1.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested twice in primitive
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
			synchronous {}
 
		sync {
 
			sync {}
 
		}
 
	}
 
}
testdata/parser/negative/1.txt
Show inline comments
 
Parse error at 1.pdl:7:4: Illegal nested synchronous statement
 
			synchronous {}
 
			sync {}
 
			^
testdata/parser/negative/10.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested in sync block
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (false || true) {
 
				synchronous {
 
				sync {
 
					skip;
 
				}
 
			}
 
		}
 
	} 
 
}
testdata/parser/negative/10.txt
Show inline comments
 
Parse error at 10.pdl:8:5: Illegal nested synchronous statement
 
				synchronous {
 
				sync {
 
				^
testdata/parser/negative/12.pdl
Show inline comments
 
#version 100
 

	
 
// illegal node declaration
 
primitive main(in a, out b) {
 
	while (true) {
 
		channel x -> y;
 
		synchronous {}
 
		sync {}
 
	}
 
}
testdata/parser/negative/19.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				return 5;
 
			} else {
 
				block(a);
 
			}
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/negative/24.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * x *= 5;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/negative/3.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested deeply in composite
 
composite main(in a, out b) {
 
	channel x -> y;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			skip;
 
		}
 
	}
 
}
testdata/parser/negative/3.txt
Show inline comments
 
Parse error at 3.pdl:7:3: Illegal nested synchronous statement
 
		synchronous {
 
		sync {
 
		^
testdata/parser/negative/31.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            break; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/32.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    loop: {
 
        synchronous {
 
        sync {
 
            goto loop; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/4.pdl
Show inline comments
 
#version 100
 

	
 
// built-in outside sync block
 
primitive main(in a, out b) {
 
	int x = 0;
 
	msg y = create(0); // legal
 
	while (x < 10) {
 
		y = get(a); // illegal
 
		synchronous {
 
		sync {
 
			y = get(a); // legal
 
		}
 
	}
 
}
testdata/parser/negative/6.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
// duplicate formal parameters
 
composite main(in a, out a) {
 
	new sync(a, a);
 
	new sync_component(a, a);
 
}
testdata/parser/negative/7.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
// shadowing formal parameter
 
composite main(in a, out b) {
 
	channel c -> a;
 
	new sync(a, b);
 
	new sync_component(a, b);
 
}
testdata/parser/negative/8.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main(in a, out b) {
 
	channel c -> d;
 
	syncdrain(a, b);
 
}
 

	
 
// shadowing import
 
primitive syncdrain(in a, in b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (!fires(a) || !fires(b)) {
 
				block(a);
 
				block(b);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/1.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in asend, out arecv, in bsend, out brecv) {
 
    channel xo -> xi;
 
    channel yo -> yi;
 
    new replicator(asend, xo, brecv);
 
    new replicator(bsend, yo, arecv);
 
    // x fires first, then y, then x, et cetera
 
    new sequencer(xi, yi);
 
}
 

	
 
primitive replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 

	
 
composite sequencer(in x, in y) {
 
    channel ao -> ai;
 
    channel bo -> bi;
 
    channel co -> ci;
 
    channel do -> di;
 
    channel eo -> ei;
 
    channel fo -> fi;
 
    new syncdrain(x, ai);
 
    new syncdrain(y, bi);
 
    new replicator(ei, ao, co);
 
    new replicator(fi, bo, do);
 
    new fifo(ci, fo, null);
 
    new fifo(di, eo, create(0));
 
}
 

	
 
primitive syncdrain(in a, in b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                get(a);
 
                get(b);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (c != null) {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
            } else {
 
                assert !fires(b);
 
                if (fires(a)) {
 
                    c = get(a);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
primitive sequencer2(in x, in y) {
 
	while (true) {
 
	    boolean b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(y);
 
				if (fires(x))
 
					b = true;
 
			}
 
		}
 
		b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(x);
 
				if (fires(y))
 
					b = true;
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/10.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(in a, out[] b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				int i = 0;
 
				while (i < b.length) {
 
					if (fires(b[i])) {
 
						int j = i + 1;
 
						while (j < b.length)
 
							assert !fires(b[j++]);
 
						break;
 
					}
 
					i++;
 
				}
 
				assert i < b.length;
 
			} else {
 
				int i = 0;
 
				while (i < b.length)
 
					assert !fires(b[i++]);
 
			}
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/11.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	msg x = null;
 
	while (x == null) {
 
		synchronous {
 
		sync {
 
			if (fires(a))
 
				x = get(a);
 
		}
 
	}
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(b))
 
				put(b, x);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/12.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * (x *= 5);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/13.pdl
Show inline comments
 
#version 100
 

	
 
/*
 
Adaptation of 7.pdl
 
*/
 

	
 
composite main() {}
 

	
 
composite example(in[] a, in[] b, out x) {
 
	new async(a);
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					break;
 
				}
 
				i++;
 
			}
 
			if (i >= a.length || i >= b.length)
 
				assert !fires(x);
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length)
 
				if (fires(a[i++])) break;
 
			while (i < a.length)
 
				assert !fires(a[i++]);
 
		}
 
	}
 
}
testdata/parser/positive/14.pdl
Show inline comments
 
#version 100
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
    channel bo -> bi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new binary_replicator(bi, ao, c);
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
 

	
 
primitive binary_replicator(in b, out a, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(b) && fires(a) && fires(c)) {
 
                msg x = get(b);
 
                put(a, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/15.pdl
Show inline comments
 
#version
 

	
 
import std.reo;
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
	channel bo -> bi;
 
	channel axo -> axi;
 
	channel zo -> zi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new replicator(bi, {axo, c});
 
	new consensus({axi, zi}, ao);
 
	new generator(zo);
 
}
 

	
 
primitive generator(out z) {
 
	while (true) {
 
		synchronous (msg x) {
 
			if (x == null) {
 
				put(z, x);
 
				assert !fires(x);
 
			} else {
 
				put(z, x);
 
				assert fires(x);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/16.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {
 
	channel xo -> xi;
 
	new a(xi);
 
	new c(xo);
 
}
 

	
 
primitive a(in x) {
 
	synchronous {
 
	sync {
 
		msg m = get(x);
 
		assert m.length == 5;
 
		assert m[0] == 'h';
 
		assert m[1] == 'e';
 
		assert m[2] == 'l';
 
		assert m[3] == 'l';
 
		assert m[4] == 'o';
 
	}
 
}
 

	
 
primitive b(out x) {
 
	synchronous (msg m) {
 
		put(x, m);
 
	}
 
}
 
// or
 
primitive c(out x) {
 
	synchronous {
 
	sync {
 
		msg m = create(5);
 
		m[0] = 'h';
 
		m[1] = 'e';
 
		m[2] = 'l';
 
		m[3] = 'l';
 
		m[4] = 'o';
 
		put(x, m);
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/17.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in x, out y) {
 
	new prophet(x, y);
 
}
 

	
 
primitive prophet(in b, out a) {
 
	msg c = null;
 
	while (true) {
 
		if (c != null) {
 
			synchronous {
 
			sync {
 
				assert !fires(a);
 
				if (fires(b)) {
 
					assert get(b) == c;
 
					c = null;
 
				}
 
			}
 
		} else {
 
			synchronous (msg x) {
 
				assert !fires(b);
 
				if (fires(a)) {
 
					put(a, x);
 
					c = x;
 
				}
 
			}
 
		}
 
	}
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        if (c != null) {
 
        	synchronous {
 
        	sync {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
            }
 
        } else {
 
        	synchronous {
 
        	sync {
 
                assert !fires(b);
 
                if (fires(a)) {
 
                    c = get(a);
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/positive/18.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main() {}
 

	
 
primitive main1(in a, out c) {
 
	int x = 0;
 
	int y = 0;
 
	msg z = null;
 
	msg w = null;
 
	x = 1;
 
	y = 1;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (x > 0 && fires(a)) {
 
				z = get(a);
 
				x--;
 
			}
 
			if (w != null && fires(c)) {
 
				put(c, w);
 
				w = null;
 
				y++;
 
			}
 
		}
 
		synchronous {
 
		sync {
 
			assert !fires(a) && !fires(c);
 
			if (z != null && y > 0) {
 
				w = z;
 
				z = null;
 
				y--;
 
				x++;
 
			}
 
		}
 
	}
 
}
 

	
 
composite main2(in a, out c) {
 
	channel xo -> xi;
 
	new fifo(a, xo, null);
 
	new fifo(xi, c, null);
 
}
testdata/parser/positive/19.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(int a) {
 
    synchronous {
 
    sync {
 
        loop: {
 
            goto loop; // allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/positive/2.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
composite main(in asend, out arecv, in bsend, out brecv, in csend, out crecv) {
 
    channel xo -> xi;
 
    channel yo -> yi;
 
    channel zo -> zi;
 
    // Every synchronous round, at most one message is sent (to determine a global order)
 
    new mymerger(asend, bsend, xo);
 
    new mymerger(csend, xi, yo);
 
    // If a message is sent, it is broadcast to every recipient
 
    new replicator(yi, {arecv, zo});
 
    new replicator(zi, {brecv, crecv});
 
}
 

	
 
primitive mymerger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && !fires(b) && fires(c)) {
 
                put(c, get(a));
 
            } else if (!fires(a) && fires(b) && fires(c)) {
 
                put(c, get(b));
 
            } else {
 
            	assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/3.pdl
Show inline comments
 
@@ -9,49 +9,49 @@ composite main(in ai, out ao, in bi, out bo, in ci, out co, in di, out do) {
 
        channel afo -> aii;
 
        channel bfo -> bii;
 
        channel cfo -> cii;
 
        channel dfo -> dii;
 
        // Part 1. Collect all in msgs.
 
        new fifo(ai, afo, null);
 
        new fifo(bi, bfo, null);
 
        new fifo(ci, cfo, null);
 
        new fifo(di, dfo, null);
 
        // Part 2. Compute maximum.
 
        new computeMax(aii, bii, cii, dii, xo);
 
    }
 
    // Part 3. Send maximum to all out msgs, and repeat.
 
    {
 
        channel xxo -> xxi;
 
        channel xxxo -> xxxi;
 
        new replicator(xi, xxo, ao);
 
        new replicator(xxi, xxxo, bo);
 
        new replicator(xxxi, co, do);
 
    }
 
}
 

	
 
primitive computeMax(in a, in b, in c, in d, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
            if (fires(a) && fires(b) && fires(c) && fires(d) && fires(x)) {
 
            	msg aa = get(a);
 
            	msg bb = get(b);
 
            	msg cc = get(c);
 
            	msg dd = get(d);
 
            	uint16_t aaa = aa[0] & aa[1] << 8;
 
                uint16_t bbb = bb[0] & bb[1] << 8;
 
                uint16_t ccc = cc[0] & cc[1] << 8;
 
                uint16_t ddd = dd[0] & dd[1] << 8;
 
                // broadcast message with highest header
 
                uint16_t max = aaa;
 
                if (bbb > max) max = bbb;
 
                if (ccc > max) max = ccc;
 
                if (ddd > max) max = ddd;
 
                if (max == aaa) put(x, aa);
 
                else if (max == bbb) put(x, bb);
 
                else if (max == ccc) put(x, cc);
 
                else if (max == ddd) put(x, dd);
 
            } else {
 
	            assert !fires(a) && !fires(b) && !fires(c) && !fires(d) && !fires(x);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/5.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 
import std.buf;
 

	
 
primitive main(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                msg x = get(a);
 
                short y = readShort(x, 0);
 
                y++;
 
                writeShort(x, 0, y);
 
                put(b, x);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/6.pdl
Show inline comments
 
#version 100
 

	
 
composite main(in a1, in a2, in a3, out b1, out b2) {
 
	new reonode({a1, a2, a3}, {b1, b2});
 
}
 

	
 
composite reonode(in[] a, out[] b) {
 
	channel co -> ci;
 
	new merger(a, co);
 
	new replicator(ci, b);
 
}
 

	
 
composite replicator(in a, out[] b) {
 
	if (b.length == 0) {
 
		new blocking(a);
 
	} else if (b.length == 1) {
 
		new sync(a, b[0]);
 
		new sync_component(a, b[0]);
 
	} else {
 
		channel xo -> xi;
 
		new binary_replicator(a, b[0], xo);
 
		new replicator(xi, b[1 : b.length - 1]);
 
	}
 
}
 
primitive binary_replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive blocking(in a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
composite merger(in[] a, out b) {
 
	if (a.length == 0) {
 
		new silent(b);
 
	} else {
 
		in prev = a[0];
 
		int i = 1;
 
		while (i < a.length) {
 
			channel yi -> yo;
 
			new binary_merger(prev, a[i], yo);
 
			prev = yi;
 
			i++;
 
		}
 
		new sync(prev, b);
 
		new sync_component(prev, b);
 
	}
 
}
 
primitive binary_merger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(c)) {
 
                assert !fires(b);
 
                put(c, get(a));
 
            } else if (fires(b) && fires(c)) {
 
                assert !fires(a);
 
                put(c, get(b));
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive silent(out a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	put(b, get(a));
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
testdata/parser/positive/7.pdl
Show inline comments
 
@@ -29,54 +29,54 @@ What can Ron and Moshe do in order not leak more information than necessary?
 
Here is one of our favorite solutions suggested by Miki Ajtai of the IBM
 
Almaden Research Center. His proposal is "physical" in the sense that
 
Ron and Moshe must be together. We need to assume that there is a fairly
 
small pool of candidates, say twenty. Ron and Moshe obtain twenty
 
identical containers (perhaps by purchasing disposable cups (paper or
 
plastic)), arrange them in a line, and write labels in front of each
 
cup, one for each candidate. Ron then puts a folded slip of paper saying
 
``Yes'' in the cup of the person who complained to him, and a slip
 
saying ``No'' in the other nineteen cups. Moshe does the same. Ron and
 
Moshe then remove the labels, and shuffle the cups at random. They then
 
look inside the cups to see whether one of them contains two slips
 
saying ``Yes'' and decide accordingly.
 
*/
 

	
 
composite main() {}
 

	
 
composite puzzle(in[] a, in[] b, out x) {
 
	new async(a);
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					goto end;
 
				}
 
				i++;
 
			}
 
			assert !fires(x);
 
			end: skip;
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			int j = 0;
 
			while (i < a.length) {
 
				if (fires(a[i])) break;
 
				i++;
 
			}
 
			while (j < a.length) {
 
				assert i == j || !fires(a[j]);
 
			}
 
		}
 
	}
 
}
testdata/parser/positive/8.pdl
Show inline comments
 
@@ -10,64 +10,64 @@ starting with 0 and successively appending the Boolean complement of the
 
sequence obtained thus far.
 

	
 
To compute the nth element t_n, write the number n in binary. If the
 
number of ones in this binary expansion is odd then t_n = 1, if even
 
then t_n = 0. For this reason John H. Conway et al. call numbers n
 
satisfying t_n = 1 odious (for odd) numbers and numbers for which
 
t_n = 0 evil (for even) numbers. In other words, t_n = 0 if n is
 
an evil number and t_n = 1 if n is an odious number.
 

	
 
*/
 

	
 
import std.reo;
 

	
 
composite main(out x) {
 
	channel ao -> ai;
 
	channel bo -> bi;
 
	channel co -> ci;
 
	new evil_or_odious(ai, bo);
 
	new replicator(bi, {co, x});
 
	new recorder(ao, ci);
 
}
 

	
 
primitive evil_or_odious(in x, out y) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(x) && fires(y)) {
 
				msg a = get(x);
 
				msg result = create(1);
 
				boolean even = true;
 
				int i = 0;
 
				while (i < a.length) {
 
					if (a[i++] == '1')
 
						even = !even;
 
				}
 
				result[0] = even ? '1' : '0';
 
				put(y, result);
 
			} else {
 
				assert !fires(x);
 
				assert !fires(y);
 
			}
 
		}
 
	}
 
}
 
primitive recorder(out h, in a) {
 
	msg c = create(0);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(h) && fires(a)) {
 
				put(h, c);
 
				{
 
					msg x = get(a);
 
					msg n = create(c.length + 1);
 
					int i = 0;
 
					while (i < c.length) {
 
						n[i] = c[i];
 
						i++;
 
					}
 
					n[c.length] = x[0];
 
					c = n;
 
				}
 
			}
 
		}
 
	}
 
} 
testdata/parser/positive/tarry.pdl
Show inline comments
 
@@ -22,176 +22,176 @@ composite main(in start, out end, out s1o, in s1i, out s2o, in s2i, out s3o, in
 
	// Processes: p, q, r, s
 
	// Channels: pq, pr, qr, rs
 
	channel p_pq -> pq_q;
 
	channel q_pq -> pq_p;
 
	channel p_pr -> pr_r;
 
	channel r_pr -> pr_p;
 
	channel q_qr -> qr_r;
 
	channel r_qr -> qr_q;
 
	channel r_rs -> rs_s;
 
	channel s_rs -> rs_r;
 
	
 
	new initiator(start, end, {pq_p, pr_p}, {p_pq, p_pr});
 
	new noninitiator(s1o, s1i, {pq_q, qr_q}, {q_pq, q_qr});
 
	new noninitiator(s2o, s2i, {pr_r, rs_r}, {r_pr, r_rs});
 
	new noninitiator(s3o, s3i, {rs_s}, {s_rs});
 
}
 
primitive initiator(in start, out end, in[] peeri, out[] peero) {
 
	msg token = null;
 
	in[] neighbori = {};
 
	out[] neighboro = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		// Step 1. Initiator waits for token
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Reset neighbors
 
		neighbori = peeri;
 
		peeri = {};
 
		neighboro = peero;
 
		peero = {};
 
		// Step 2. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			int idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
			// Eliminate from neighbor set
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 3. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 4. Token is back and all neighbors visited
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
	}
 
}
 
primitive noninitiator(out start, in end, in[] peeri, out[] peero) {
 
	msg token = null;
 
	in[] neighbori = {};
 
	out[] neighboro = {};
 
	in[] parenti = {};
 
	out[] parento = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		int idx = 0;
 
		// Step 1. Await token for first time
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				int i = 0;
 
				while (i < peeri.length) {
 
					if (fires(peeri[i])) {
 
						token = get(peeri[i]);
 
						idx = i;
 
						break;
 
					} else i++;
 
				}
 
			}
 
		}
 
		// Reset neighbors
 
		neighbori = peeri[0:idx] @ peeri[idx:peeri.length];
 
		neighboro = peero[0:idx] @ peero[idx:peero.length];
 
		parenti = {peeri[idx]};
 
		parento = {peero[idx]};
 
		peeri = {};
 
		peero = {};
 
		// Step 2. Non-initiator signals
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Step 3. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
			// Eliminate from neighbor set
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 4. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 5. Token is back, pass to parent
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(parento[0])) {
 
					put(parento[0], token);
 
					token = null;
 
				}
 
			}
 
		}
 
		peeri = {parenti[0]} @ peeri;
 
		peero = {parento[0]} @ peero;
 
		parenti = {};
 
		parento = {};
 
	}
 
}
0 comments (0 inline, 0 general)