Changeset - 36cc1fe490f7
[Not reviewed]
Merge
! ! !
MH - 4 years ago 2021-11-15 12:23:58
contact@maxhenger.nl
Merge branch 'feat-api-cmds-and-branching'

Implements the programmer-facing API to allow programmatic
specification of a synchronous round. The way in which these put/get
interactions are performed is in an initial shape. Perhaps this will
change in the future.

The second main set of changes is the addion of a 'fork' statement,
which allows explicit forking, and allowing multiple puts/gets over the
same transport link within a single sync round.
62 files changed with 1682 insertions and 622 deletions:
0 comments (0 inline, 0 general)
examples/bench_04/main.c
Show inline comments
 
@@ -5,13 +5,13 @@ int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
examples/bench_05/main.c
Show inline comments
 
@@ -6,13 +6,13 @@ int main(int argc, char** argv) {
 
	port_pairs = atoi(argv[1]);
 
	proto_components = atoi(argv[2]);
 
	printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"    while(true) sync {}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_5.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<port_pairs; i++) {
examples/bench_09/main.c
Show inline comments
 
@@ -9,13 +9,13 @@ int main(int argc, char** argv) {
 
	const unsigned char pdl[] = 
 
	"primitive presync_work() {   "
 
	"    int i = 0;               "
 
	"    while(true) {            "
 
	"        i = 0;               "
 
	"        while(i < 2)  i++;   "
 
	"        synchronous {}       "
 
	"        sync {}       "
 
	"    }                        "
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
examples/bench_11/main.c
Show inline comments
 
@@ -6,13 +6,13 @@ int main(int argc, char** argv) {
 
	forwards = atoi(argv[1]);
 
	num_options = atoi(argv[2]);
 
	printf("forwards %d, num_options %d\n",
 
		forwards, num_options);
 
	unsigned char pdl[] = 
 
	"primitive recv_zero(in a) {  "
 
	"    while(true) synchronous {"
 
	"    while(true) sync {"
 
	"        msg m = get(a);      "
 
	"        assert(m[0] == 0);   "
 
	"    }                        "
 
	"}                            "
 
	; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
examples/bench_23/main.c
Show inline comments
 
@@ -3,31 +3,31 @@
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i;
 

	
 
	// unsigned char pdl[] = "\
 
	// primitive xrouter(in a, out b, out c) {\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            if(fires(a)) {\
 
 //                if(fires(b)) put(b, get(a));\
 
 //                else         put(c, get(a));\
 
 //            }\
 
 //        }\
 
 //    }"
 
 //    ;
 
	unsigned char pdl[] = "\
 
	primitive lossy(in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                msg m = get(a);\
 
                if(fires(b)) put(b, m);\
 
            }\
 
        }\
 
    }\
 
    primitive sync_drain(in a, in b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                get(a);\
 
                get(b);\
 
            }\
 
        }\
 
    }\
examples/bench_24/main.c
Show inline comments
 
@@ -3,13 +3,13 @@
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j;
 

	
 
	unsigned char pdl[] = "\
 
	primitive fifo1_init(msg m, in a, out b) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(m != null && fires(b)) {\
 
                put(b, m);\
 
                m = null;\
 
            } else if (m == null && fires(a)) {\
 
                m = get(a);\
 
            }\
 
@@ -36,13 +36,13 @@ int main(int argc, char** argv) {
 
        new replicator(m, n, c);\
 
    }"
 
    ;
 
	// unsigned char pdl[] = "\
 
	// primitive sequencer3(out a, out b, out c) {\
 
 //        int i = 0;\
 
 //        while(true) synchronous {\
 
 //        while(true) sync {\
 
 //            out to = a;\
 
 //            if     (i==1) to = b;\
 
 //            else if(i==2) to = c;\
 
 //            if(fires(to)) {\
 
 //                put(to, create(0));\
 
 //                i = (i + 1)%3;\
examples/bench_27/main.c
Show inline comments
 
@@ -6,13 +6,13 @@ int main(int argc, char** argv) {
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "\
 
    primitive xrouter(in a, out b, out c) {\
 
        while(true) synchronous {\
 
        while(true) sync {\
 
            if(fires(a)) {\
 
                if(fires(b)) put(b, get(a));\
 
                else         put(c, get(a));\
 
            }\
 
        }\
 
    }\
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
  sync {
 
    put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 

	
 
primitive alt_round_merger(in a, in b, out c){
 
  while(true) {
 
    synchronous{ put(c, get(a)); }
 
    synchronous{ put(c, get(b)); }
 
    sync { put(c, get(a)); }
 
    sync { put(c, get(b)); }
 
  }	
 
}
src/protocol/ast.rs
Show inline comments
 
@@ -135,12 +135,14 @@ define_new_ast_id!(EndIfStatementId, StatementId, index(EndIfStatement, Statemen
 
define_new_ast_id!(WhileStatementId, StatementId, index(WhileStatement, Statement::While, statements), alloc(alloc_while_statement));
 
define_new_ast_id!(EndWhileStatementId, StatementId, index(EndWhileStatement, Statement::EndWhile, statements), alloc(alloc_end_while_statement));
 
define_new_ast_id!(BreakStatementId, StatementId, index(BreakStatement, Statement::Break, statements), alloc(alloc_break_statement));
 
define_new_ast_id!(ContinueStatementId, StatementId, index(ContinueStatement, Statement::Continue, statements), alloc(alloc_continue_statement));
 
define_new_ast_id!(SynchronousStatementId, StatementId, index(SynchronousStatement, Statement::Synchronous, statements), alloc(alloc_synchronous_statement));
 
define_new_ast_id!(EndSynchronousStatementId, StatementId, index(EndSynchronousStatement, Statement::EndSynchronous, statements), alloc(alloc_end_synchronous_statement));
 
define_new_ast_id!(ForkStatementId, StatementId, index(ForkStatement, Statement::Fork, statements), alloc(alloc_fork_statement));
 
define_new_ast_id!(EndForkStatementId, StatementId, index(EndForkStatement, Statement::EndFork, statements), alloc(alloc_end_fork_statement));
 
define_new_ast_id!(ReturnStatementId, StatementId, index(ReturnStatement, Statement::Return, statements), alloc(alloc_return_statement));
 
define_new_ast_id!(GotoStatementId, StatementId, index(GotoStatement, Statement::Goto, statements), alloc(alloc_goto_statement));
 
define_new_ast_id!(NewStatementId, StatementId, index(NewStatement, Statement::New, statements), alloc(alloc_new_statement));
 
define_new_ast_id!(ExpressionStatementId, StatementId, index(ExpressionStatement, Statement::Expression, statements), alloc(alloc_expression_statement));
 

	
 
define_aliased_ast_id!(ExpressionId, Id<Expression>, index(Expression, expressions));
 
@@ -1032,12 +1034,14 @@ pub enum Statement {
 
    While(WhileStatement),
 
    EndWhile(EndWhileStatement),
 
    Break(BreakStatement),
 
    Continue(ContinueStatement),
 
    Synchronous(SynchronousStatement),
 
    EndSynchronous(EndSynchronousStatement),
 
    Fork(ForkStatement),
 
    EndFork(EndForkStatement),
 
    Return(ReturnStatement),
 
    Goto(GotoStatement),
 
    New(NewStatement),
 
    Expression(ExpressionStatement),
 
}
 

	
 
@@ -1075,17 +1079,18 @@ impl Statement {
 
            Statement::Labeled(v) => v.label.span,
 
            Statement::If(v) => v.span,
 
            Statement::While(v) => v.span,
 
            Statement::Break(v) => v.span,
 
            Statement::Continue(v) => v.span,
 
            Statement::Synchronous(v) => v.span,
 
            Statement::Fork(v) => v.span,
 
            Statement::Return(v) => v.span,
 
            Statement::Goto(v) => v.span,
 
            Statement::New(v) => v.span,
 
            Statement::Expression(v) => v.span,
 
            Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) => unreachable!(),
 
            Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) | Statement::EndFork(_) => unreachable!(),
 
        }
 
    }
 
    pub fn link_next(&mut self, next: StatementId) {
 
        match self {
 
            Statement::Block(stmt) => stmt.next = next,
 
            Statement::EndBlock(stmt) => stmt.next = next,
 
@@ -1093,18 +1098,20 @@ impl Statement {
 
                LocalStatement::Channel(stmt) => stmt.next = next,
 
                LocalStatement::Memory(stmt) => stmt.next = next,
 
            },
 
            Statement::EndIf(stmt) => stmt.next = next,
 
            Statement::EndWhile(stmt) => stmt.next = next,
 
            Statement::EndSynchronous(stmt) => stmt.next = next,
 
            Statement::EndFork(stmt) => stmt.next = next,
 
            Statement::New(stmt) => stmt.next = next,
 
            Statement::Expression(stmt) => stmt.next = next,
 
            Statement::Return(_)
 
            | Statement::Break(_)
 
            | Statement::Continue(_)
 
            | Statement::Synchronous(_)
 
            | Statement::Fork(_)
 
            | Statement::Goto(_)
 
            | Statement::While(_)
 
            | Statement::Labeled(_)
 
            | Statement::If(_) => unreachable!(),
 
        }
 
    }
 
@@ -1268,24 +1275,40 @@ pub struct ContinueStatement {
 
#[derive(Debug, Clone)]
 
pub struct SynchronousStatement {
 
    pub this: SynchronousStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "sync" keyword
 
    pub body: BlockStatementId,
 
    // Phase 2: linker
 
    pub end_sync: EndSynchronousStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndSynchronousStatement {
 
    pub this: EndSynchronousStatementId,
 
    pub start_sync: SynchronousStatementId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ForkStatement {
 
    pub this: ForkStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "fork" keyword
 
    pub left_body: BlockStatementId,
 
    pub right_body: Option<BlockStatementId>,
 
    pub end_fork: EndForkStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndForkStatement {
 
    pub this: EndForkStatementId,
 
    pub start_fork: ForkStatementId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ReturnStatement {
 
    pub this: ReturnStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "return" keyword
 
    pub expressions: Vec<ExpressionId>,
src/protocol/ast_printer.rs
Show inline comments
 
@@ -33,12 +33,14 @@ const PREFIX_ENDIF_STMT_ID: &'static str = "SEIf";
 
const PREFIX_WHILE_STMT_ID: &'static str = "SWhi";
 
const PREFIX_ENDWHILE_STMT_ID: &'static str = "SEWh";
 
const PREFIX_BREAK_STMT_ID: &'static str = "SBre";
 
const PREFIX_CONTINUE_STMT_ID: &'static str = "SCon";
 
const PREFIX_SYNC_STMT_ID: &'static str = "SSyn";
 
const PREFIX_ENDSYNC_STMT_ID: &'static str = "SESy";
 
const PREFIX_FORK_STMT_ID: &'static str = "SFrk";
 
const PREFIX_END_FORK_STMT_ID: &'static str = "SEFk";
 
const PREFIX_RETURN_STMT_ID: &'static str = "SRet";
 
const PREFIX_ASSERT_STMT_ID: &'static str = "SAsr";
 
const PREFIX_GOTO_STMT_ID: &'static str = "SGot";
 
const PREFIX_NEW_STMT_ID: &'static str = "SNew";
 
const PREFIX_PUT_STMT_ID: &'static str = "SPut";
 
const PREFIX_EXPR_STMT_ID: &'static str = "SExp";
 
@@ -508,12 +510,30 @@ impl ASTWriter {
 
            Statement::EndSynchronous(stmt) => {
 
                self.kv(indent).with_id(PREFIX_ENDSYNC_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndSynchronous");
 
                self.kv(indent2).with_s_key("StartSync").with_disp_val(&stmt.start_sync.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            },
 
            Statement::Fork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Fork");
 
                self.kv(indent2).with_s_key("EndFork").with_disp_val(&stmt.end_fork.0.index);
 
                self.kv(indent2).with_s_key("LeftBody");
 
                self.write_stmt(heap, stmt.left_body.upcast(), indent3);
 

	
 
                if let Some(right_body_id) = stmt.right_body {
 
                    self.kv(indent2).with_s_key("RightBody");
 
                    self.write_stmt(heap, right_body_id.upcast(), indent3);
 
                }
 
            },
 
            Statement::EndFork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_END_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndFork");
 
                self.kv(indent2).with_s_key("StartFork").with_disp_val(&stmt.start_fork.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            }
 
            Statement::Return(stmt) => {
 
                self.kv(indent).with_id(PREFIX_RETURN_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Return");
 
                self.kv(indent2).with_s_key("Expressions");
 
                for expr_id in &stmt.expressions {
 
                    self.write_expr(heap, *expr_id, indent3);
src/protocol/eval/executor.rs
Show inline comments
 
@@ -197,12 +197,13 @@ pub enum EvalContinuation {
 
    Inconsistent,
 
    Terminal,
 
    SyncBlockStart,
 
    SyncBlockEnd,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    NewChannel,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, Value),
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
@@ -581,13 +582,13 @@ impl Prompt {
 
                                    let port_id = if let Value::Input(port_id) = value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'get' on value {:?}", value)
 
                                    };
 

	
 
                                    match ctx.get(port_id) {
 
                                    match ctx.performed_get(port_id) {
 
                                        Some(result) => {
 
                                            // We have the result. Merge the `ValueGroup` with the
 
                                            // stack/heap storage.
 
                                            debug_assert_eq!(result.values.len(), 1);
 
                                            result.into_stack(&mut cur_frame.expr_values, &mut self.store);
 
                                        },
 
@@ -610,17 +611,18 @@ impl Prompt {
 
                                        unreachable!("executor calling 'put' on value {:?}", deref_port_value)
 
                                    };
 

	
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    if ctx.did_put(port_id) {
 
                                    if ctx.performed_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        // Prepare to execute again
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        return Ok(EvalContinuation::Put(port_id, deref_msg_value));
 
                                    }
 
                                },
 
@@ -807,13 +809,13 @@ impl Prompt {
 
                        cur_frame.position = stmt.next;
 
                        Ok(EvalContinuation::Stepping)
 
                    },
 
                    LocalStatement::Channel(stmt) => {
 
                        // Need to create a new channel by requesting it from
 
                        // the runtime.
 
                        match ctx.get_channel() {
 
                        match ctx.created_channel() {
 
                            None => {
 
                                // No channel is pending. So request one
 
                                Ok(EvalContinuation::NewChannel)
 
                            },
 
                            Some((put_port, get_port)) => {
 
                                self.store.write(ValueId::Stack(heap[stmt.from].unique_id_in_scope as u32), put_port);
 
@@ -883,12 +885,39 @@ impl Prompt {
 
            },
 
            Statement::EndSynchronous(stmt) => {
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::SyncBlockEnd)
 
            },
 
            Statement::Fork(stmt) => {
 
                if stmt.right_body.is_none() {
 
                    // No reason to fork
 
                    cur_frame.position = stmt.left_body.upcast();
 
                } else {
 
                    // Need to fork
 
                    if let Some(go_left) = ctx.performed_fork() {
 
                        // Runtime has created a fork
 
                        if go_left {
 
                            cur_frame.position = stmt.left_body.upcast();
 
                        } else {
 
                            cur_frame.position = stmt.right_body.unwrap().upcast();
 
                        }
 
                    } else {
 
                        // Request the runtime to create a fork of the current
 
                        // branch
 
                        return Ok(EvalContinuation::NewFork);
 
                    }
 
                }
 

	
 
                Ok(EvalContinuation::Stepping)
 
            },
 
            Statement::EndFork(stmt) => {
 
                cur_frame.position = stmt.next;
 

	
 
                Ok(EvalContinuation::Stepping)
 
            }
 
            Statement::Return(_stmt) => {
 
                debug_assert!(heap[cur_frame.definition].is_function());
 
                debug_assert_eq!(cur_frame.expr_values.len(), 1, "expected one expr value for return statement");
 

	
 
                // The preceding frame has executed a call, so is expecting the
 
                // return expression on its expression value stack. Note that
src/protocol/mod.rs
Show inline comments
 
@@ -49,12 +49,13 @@ pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
    UnownedPort,
 
    InSync,
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(An opaque protocol description)")
 
    }
 
@@ -280,30 +281,32 @@ impl ProtocolDescription {
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn performed_put(&mut self, port: PortId) -> bool;
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
    fn performed_fork(&mut self) -> Option<bool>; // None if not yet forked
 
    fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
pub enum RunResult {
 
    // Can only occur outside sync blocks
 
    ComponentTerminated, // component has exited its procedure
 
    ComponentAtSyncStart,
 
    NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync
 
    NewChannel, // should also be possible inside sync
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchGet(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchFork,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
@@ -325,14 +328,16 @@ impl ComponentState {
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::NewFork =>
 
                        return RR::BranchFork,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id),
 
                    EC::BlockGet(port_id) => return RR::BranchGet(port_id),
 
                    EC::Put(port_id, value) => {
 
                        let value_group = ValueGroup::from_store(&self.prompt.store, &[value]);
 
                        return RR::BranchPut(port_id, value_group);
 
                    },
 
                }
 
            }
 
@@ -394,12 +399,13 @@ impl ComponentState {
 
                        // Because of the way we emulate the old context for now, we can safely
 
                        // assume that this will never happen. The old context thingamajig always
 
                        // creates a channel, it never bubbles a "need to create a channel" message
 
                        // to the runtime
 
                        unreachable!();
 
                    },
 
                    EvalContinuation::NewFork => unreachable!(),
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
                    EvalContinuation::Put(_, _) => unreachable!(),
 
                },
 
            }
 
@@ -427,12 +433,13 @@ impl ComponentState {
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _, _) => unreachable!(),
 
                    EvalContinuation::NewChannel => unreachable!(),
 
                    EvalContinuation::NewFork => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        return SyncBlocker::CouldntCheckFiring(port);
 
                    },
 
                    EvalContinuation::BlockGet(port) => {
 
                        return SyncBlocker::CouldntReadMsg(port);
 
                    },
 
@@ -459,23 +466,23 @@ impl ComponentState {
 
            }
 
        }
 
    }
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
    fn performed_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                ctx.did_put_or_get(port)
 
            }
 
        }
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                let payload = ctx.read_msg(port);
 
                if payload.is_none() {
 
@@ -508,24 +515,29 @@ impl RunContext for EvalContext<'_> {
 
                    None => None,
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(from);
 
                let to = Value::Input(to);
 
                return Some((from, to));
 
            },
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        // Never actually used in the old runtime
 
        return None;
 
    }
 
}
 

	
 
// TODO: @remove once old runtime has disappeared
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
src/protocol/parser/pass_definitions.rs
Show inline comments
 
@@ -420,19 +420,34 @@ impl PassDefinitions {
 
                let id = self.consume_continue_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_SYNC {
 
                let id = self.consume_synchronous_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 

	
 
                let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement{
 
                    this, start_sync: id, next: StatementId::new_invalid()
 
                let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement {
 
                    this,
 
                    start_sync: id,
 
                    next: StatementId::new_invalid()
 
                });
 
                section.push(end_sync.upcast());
 

	
 
                let sync_stmt = &mut ctx.heap[id];
 
                sync_stmt.end_sync = end_sync;
 
            } else if ident == KW_STMT_FORK {
 
                let id = self.consume_fork_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 

	
 
                let end_fork = ctx.heap.alloc_end_fork_statement(|this| EndForkStatement{
 
                    this,
 
                    start_fork: id,
 
                    next: StatementId::new_invalid(),
 
                });
 
                section.push(end_fork.upcast());
 

	
 
                let fork_stmt = &mut ctx.heap[id];
 
                fork_stmt.end_fork = end_fork;
 
            } else if ident == KW_STMT_RETURN {
 
                let id = self.consume_return_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
            } else if ident == KW_STMT_GOTO {
 
                let id = self.consume_goto_statement(module, iter, ctx)?;
 
                section.push(id.upcast());
 
@@ -610,12 +625,35 @@ impl PassDefinitions {
 
            span: synchronous_span,
 
            body,
 
            end_sync: EndSynchronousStatementId::new_invalid(),
 
        }))
 
    }
 

	
 
    fn consume_fork_statement(
 
        &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx
 
    ) -> Result<ForkStatementId, ParseError> {
 
        let fork_span = consume_exact_ident(&module.source, iter, KW_STMT_FORK)?;
 
        let left_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?;
 

	
 
        let right_body = if has_ident(&module.source, iter, KW_STMT_OR) {
 
            iter.consume();
 
            let right_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?;
 
            Some(right_body)
 
        } else {
 
            None
 
        };
 

	
 
        Ok(ctx.heap.alloc_fork_statement(|this| ForkStatement{
 
            this,
 
            span: fork_span,
 
            left_body,
 
            right_body,
 
            end_fork: EndForkStatementId::new_invalid(),
 
        }))
 
    }
 

	
 
    fn consume_return_statement(
 
        &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx
 
    ) -> Result<ReturnStatementId, ParseError> {
 
        let return_span = consume_exact_ident(&module.source, iter, KW_STMT_RETURN)?;
 
        let mut scoped_section = self.expressions.start_section();
 

	
src/protocol/parser/pass_typing.rs
Show inline comments
 
@@ -1139,12 +1139,25 @@ impl Visitor for PassTyping {
 
        let sync_stmt = &ctx.heap[id];
 
        let body_id = sync_stmt.body;
 

	
 
        self.visit_block_stmt(ctx, body_id)
 
    }
 

	
 
    fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult {
 
        let fork_stmt = &ctx.heap[id];
 
        let left_body_id = fork_stmt.left_body;
 
        let right_body_id = fork_stmt.right_body;
 

	
 
        self.visit_block_stmt(ctx, left_body_id)?;
 
        if let Some(right_body_id) = right_body_id {
 
            self.visit_block_stmt(ctx, right_body_id)?;
 
        }
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult {
 
        let return_stmt = &ctx.heap[id];
 
        debug_assert_eq!(return_stmt.expressions.len(), 1);
 
        let expr_id = return_stmt.expressions[0];
 

	
 
        self.visit_expr(ctx, expr_id)
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
/*
 
 * pass_validation_linking.rs
 
 *
 
 * The pass that will validate properties of the AST statements (one is not
 
 * allowed to nest synchronous statements, instantiating components occurs in
 
 * the right places, etc.) and expressions (assignments may not occur in
 
 * arbitrary expressions).
 
 *
 
 * Furthermore, this pass will also perform "linking", in the sense of: some AST
 
 * nodes have something to do with one another, so we link them up in this pass
 
 * (e.g. setting the parents of expressions, linking the control flow statements
 
 * like `continue` and `break` up to the respective loop statement, etc.).
 
 *
 
 * There are several "confusing" parts about this pass:
 
 *
 
 * Setting expression parents: this is the simplest one. The pass struct acts
 
 * like a little state machine. When visiting an expression it will set the
 
 * "parent expression" field of the pass to itself, then visit its child. The
 
 * child will look at this "parent expression" field to determine its parent.
 
 *
 
 * Setting the `next` statement: the AST is a tree, but during execution we walk
 
 * a linear path through all statements. So where appropriate a statement may
 
 * set the "previous statement" field of the pass to itself. When visiting the
 
 * subsequent statement it will check this "previous statement", and if set, it
 
 * will link this previous statement up to itself. Not every statement has a
 
 * previous statement. Hence there are two patterns that occur: assigning the
 
 * `next` value, then clearing the "previous statement" field. And assigning the
 
 * `next` value, and then putting the current statement's ID in the "previous
 
 * statement" field. Because it is so common, this file contain two macros that
 
 * perform that operation.
 
 *
 
 * To make storing types for polymorphic procedures simpler and more efficient,
 
 * we assign to each expression in the procedure a unique ID. This is what the
 
 * "next expression index" field achieves. Each expression simply takes the
 
 * current value, and then increments this counter.
 
 */
 

	
 
use crate::collections::{ScopedBuffer};
 
use crate::protocol::ast::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::symbol_table::*;
 
use crate::protocol::parser::type_table::*;
 

	
 
@@ -252,13 +289,14 @@ impl Visitor for PassValidationLinking {
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.in_test_expr = StatementId::new_invalid();
 

	
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        // Visit true and false branch. Executor chooses next statement based on
 
        // test expression, not on if-statement itself.
 
        // test expression, not on if-statement itself. Hence the if statement
 
        // does not have a static subsequent statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.visit_block_stmt(ctx, true_stmt_id)?;
 
        assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 

	
 
        if let Some(false_id) = false_stmt_id {
 
            self.visit_block_stmt(ctx, false_id)?;
 
@@ -367,12 +405,42 @@ impl Visitor for PassValidationLinking {
 

	
 
        self.in_sync = SynchronousStatementId::new_invalid();
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult {
 
        let fork_stmt = &ctx.heap[id];
 
        let end_fork_id = fork_stmt.end_fork;
 
        let left_body_id = fork_stmt.left_body;
 
        let right_body_id = fork_stmt.right_body;
 

	
 
        // Fork statements may only occur inside sync blocks
 
        if self.in_sync.is_invalid() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, fork_stmt.span,
 
                "Forking may only occur inside sync blocks"
 
            ));
 
        }
 

	
 
        // Visit the respective bodies. Like the if statement, a fork statement
 
        // does not have a single static subsequent statement. It forks and then
 
        // each fork has a different next statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.visit_block_stmt(ctx, left_body_id)?;
 
        assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 

	
 
        if let Some(right_body_id) = right_body_id {
 
            self.visit_block_stmt(ctx, right_body_id)?;
 
            assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_fork_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult {
 
        // Check if "return" occurs within a function
 
        let stmt = &ctx.heap[id];
 
        if !self.def_type.is_function() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, stmt.span,
src/protocol/parser/token_parsing.rs
Show inline comments
 
@@ -42,13 +42,15 @@ pub(crate) const KW_STMT_IF:       &'static [u8] = b"if";
 
pub(crate) const KW_STMT_ELSE:     &'static [u8] = b"else";
 
pub(crate) const KW_STMT_WHILE:    &'static [u8] = b"while";
 
pub(crate) const KW_STMT_BREAK:    &'static [u8] = b"break";
 
pub(crate) const KW_STMT_CONTINUE: &'static [u8] = b"continue";
 
pub(crate) const KW_STMT_GOTO:     &'static [u8] = b"goto";
 
pub(crate) const KW_STMT_RETURN:   &'static [u8] = b"return";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"synchronous";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"sync";
 
pub(crate) const KW_STMT_FORK:     &'static [u8] = b"fork";
 
pub(crate) const KW_STMT_OR:       &'static [u8] = b"or";
 
pub(crate) const KW_STMT_NEW:      &'static [u8] = b"new";
 

	
 
// Keywords - types
 
// Since types are needed for returning diagnostic information to the user, the
 
// string variants are put here as well.
 
pub(crate) const KW_TYPE_IN_PORT_STR:  &'static str = "in";
 
@@ -524,13 +526,13 @@ fn is_reserved_definition_keyword(text: &[u8]) -> bool {
 

	
 
fn is_reserved_statement_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_IMPORT | KW_AS |
 
        KW_STMT_CHANNEL | KW_STMT_IF | KW_STMT_WHILE |
 
        KW_STMT_BREAK | KW_STMT_CONTINUE | KW_STMT_GOTO | KW_STMT_RETURN |
 
        KW_STMT_SYNC | KW_STMT_NEW => true,
 
        KW_STMT_SYNC | KW_STMT_FORK | KW_STMT_NEW => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
src/protocol/parser/visitor.rs
Show inline comments
 
@@ -130,12 +130,17 @@ pub(crate) trait Visitor {
 
            },
 
            Statement::Synchronous(stmt) => {
 
                let this = stmt.this;
 
                self.visit_synchronous_stmt(ctx, this)
 
            },
 
            Statement::EndSynchronous(_stmt) => Ok(()),
 
            Statement::Fork(stmt) => {
 
                let this = stmt.this;
 
                self.visit_fork_stmt(ctx, this)
 
            },
 
            Statement::EndFork(_stmt) => Ok(()),
 
            Statement::Return(stmt) => {
 
                let this = stmt.this;
 
                self.visit_return_stmt(ctx, this)
 
            },
 
            Statement::Goto(stmt) => {
 
                let this = stmt.this;
 
@@ -172,12 +177,13 @@ pub(crate) trait Visitor {
 
    fn visit_labeled_stmt(&mut self, _ctx: &mut Ctx, _id: LabeledStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_if_stmt(&mut self, _ctx: &mut Ctx, _id: IfStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_while_stmt(&mut self, _ctx: &mut Ctx, _id: WhileStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_break_stmt(&mut self, _ctx: &mut Ctx, _id: BreakStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_continue_stmt(&mut self, _ctx: &mut Ctx, _id: ContinueStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_synchronous_stmt(&mut self, _ctx: &mut Ctx, _id: SynchronousStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_fork_stmt(&mut self, _ctx: &mut Ctx, _id: ForkStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_return_stmt(&mut self, _ctx: &mut Ctx, _id: ReturnStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_goto_stmt(&mut self, _ctx: &mut Ctx, _id: GotoStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_new_stmt(&mut self, _ctx: &mut Ctx, _id: NewStatementId) -> VisitorResult { Ok(()) }
 
    fn visit_expr_stmt(&mut self, _ctx: &mut Ctx, _id: ExpressionStatementId) -> VisitorResult { Ok(()) }
 

	
 
    // Expressions
src/runtime/tests.rs
Show inline comments
 
@@ -34,27 +34,27 @@ fn file_logged_configured_connector(
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).expect("Failed to create log output file!");
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, pd, connector_id)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive sync(in<msg> a, out<msg> b) {
 
primitive sync_component(in<msg> a, out<msg> b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert(!fires(a) && !fires(b));
 
            }
 
        }
 
    }
 
}
 

	
 
primitive together(in<msg> ia, in<msg> ib, out<msg> oa, out<msg> ob){
 
  while(true) synchronous {
 
  while(true) sync {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
@@ -99,13 +99,13 @@ fn new_port_pair() {
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[i, o]).unwrap();
 
    c.add_component(b"", b"sync_component", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
@@ -350,13 +350,13 @@ fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g, p]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
@@ -366,13 +366,13 @@ fn sync_sync() {
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"", b"sync", &[g0, p1]).unwrap();
 
    c.add_component(b"", b"sync_component", &[g0, p1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 
@@ -418,13 +418,13 @@ fn distributed_msg_bounce() {
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"", b"sync", &[g, p]).unwrap();
 
            c.add_component(b"", b"sync_component", &[g, p]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
@@ -879,13 +879,13 @@ fn ac_not_b() {
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in<msg> a, in<msg> b, out<msg> c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
                sync { put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let [a, b] = c.new_port_pair();
 
@@ -943,13 +943,13 @@ fn many_rounds_mem() {
 
}
 

	
 
#[test]
 
fn pdl_reo_lossy() {
 
    let pdl = b"
 
    primitive lossy(in<msg> a, out<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg m = null;
 
            if(fires(a)) {
 
                m = get(a);
 
                if(fires(b)) {
 
                    put(b, m);
 
                }
 
@@ -962,13 +962,13 @@ fn pdl_reo_lossy() {
 

	
 
#[test]
 
fn pdl_reo_fifo1() {
 
    let pdl = b"
 
    primitive fifo1(in<msg> a, out<msg> b) {
 
        msg m = null;
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
            } else {
 
                if(fires(b)) put(b, m);
 
                m = null;
 
            }
 
@@ -982,13 +982,13 @@ fn pdl_reo_fifo1() {
 
fn pdl_reo_fifo1full() {
 
    let test_log_path = Path::new("./logs/pdl_reo_fifo1full");
 
    let pdl = b"
 
    primitive fifo1full(in<msg> a, out<msg> b) {
 
        bool is_set = true;
 
        msg m = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(!is_set) {
 
                if(fires(a)) m=get(a);
 
                is_set = false;
 
            } else {
 
                if(fires(b)) put(b, m);
 
                is_set = true;
 
@@ -1009,13 +1009,13 @@ fn pdl_reo_fifo1full() {
 

	
 
#[test]
 
fn pdl_msg_consensus() {
 
    let test_log_path = Path::new("./logs/pdl_msg_consensus");
 
    let pdl = b"
 
    primitive msgconsensus(in<msg> a, in<msg> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            msg x = get(a);
 
            msg y = get(b);
 
            assert(x == y);
 
        }
 
    }
 
    ";
 
@@ -1037,13 +1037,13 @@ fn pdl_msg_consensus() {
 
#[test]
 
fn sequencer3_prim() {
 
    let test_log_path = Path::new("./logs/sequencer3_prim");
 
    let pdl = b"
 
    primitive sequencer3(out<msg> a, out<msg> b, out<msg> c) {
 
        u32 i = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            out to = a;
 
            if     (i==1) to = b;
 
            else if(i==2) to = c;
 
            if(fires(to)) {
 
                put(to, create(0));
 
                i = (i + 1)%3;
 
@@ -1084,25 +1084,25 @@ fn sequencer3_prim() {
 
#[test]
 
fn sequencer3_comp() {
 
    let test_log_path = Path::new("./logs/sequencer3_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 
    primitive fifo1_init<T>(bool has_value, T m, in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(has_value && fires(b)) {
 
                put(b, m);
 
                has_value = false;
 
            } else if (!has_value && fires(a)) {
 
                m = get(a);
 
                has_value = true;
 
@@ -1178,13 +1178,13 @@ const XROUTER_ITEMS: &[XRouterItem] = {
 

	
 
#[test]
 
fn xrouter_prim() {
 
    let test_log_path = Path::new("./logs/xrouter_prim");
 
    let pdl = b"
 
    primitive xrouter(in<msg> a, out<msg> b, out<msg> c) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                if(fires(b)) put(b, get(a));
 
                else         put(c, get(a));
 
            }
 
        }
 
    }
 
@@ -1219,13 +1219,13 @@ fn xrouter_prim() {
 
#[test]
 
fn xrouter_comp() {
 
    let test_log_path = Path::new("./logs/xrouter_comp");
 
    let pdl = b"
 
    primitive replicator<T>(in<T> a, out<T> b, out<T> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && fires(b) && fires(c)) {
 
                    msg x = get(a);
 
                    put(b, x);
 
                    put(c, x);
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
@@ -1233,34 +1233,34 @@ fn xrouter_comp() {
 
            }
 
        }
 
    }
 

	
 
    primitive merger(in<msg> a, in<msg> b, out<msg> c) {
 
        while (true) {
 
            synchronous {
 
            sync {
 
                if (fires(a) && !fires(b) && fires(c)) {
 
                    put(c, get(a));
 
                } else if (!fires(a) && fires(b) && fires(c)) {
 
                    put(c, get(b));
 
                } else {
 
                    assert(!fires(a) && !fires(b) && !fires(c));
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive lossy<T>(in<T> a, out<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                auto m = get(a);
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
    primitive sync_drain<T>(in<T> a, in<T> b) {
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                msg drop_it = get(a);
 
                msg on_the_floor = get(b);
 
            }
 
        }
 
    }
 
@@ -1317,13 +1317,13 @@ fn xrouter_comp() {
 
fn count_stream() {
 
    let test_log_path = Path::new("./logs/count_stream");
 
    let pdl = b"
 
    primitive count_stream(out<msg> o) {
 
        msg m = create(1);
 
        m[0] = 0;
 
        while(true) synchronous {
 
        while(true) sync {
 
            put(o, m);
 
            m[0] += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
@@ -1348,13 +1348,13 @@ fn for_msg_byte() {
 
    primitive for_msg_byte(out<msg> o) {
 
        u8 i = 0;
 
        u32 idx = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[idx] = i;
 
            synchronous put(o, m);
 
            sync put(o, m);
 
            i += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
@@ -1376,13 +1376,13 @@ fn for_msg_byte() {
 
fn eq_causality() {
 
    let test_log_path = Path::new("./logs/eq_causality");
 
    let pdl = b"
 
    primitive eq(in<msg> a, in<msg> b, out<msg> c) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                // left first!
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 
@@ -1432,13 +1432,13 @@ fn eq_no_causality() {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in<msg> a, in<msg> b, out<msg> c, out<msg> leftfirsto, in<msg> leftfirsti) {
 
        msg ma = create(0);
 
        msg mb = create(0);
 
        while(true) synchronous {
 
        while(true) sync {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                if(fires(leftfirsti)) {
 
                    // left first! DO USE DUMMY
 
                    ma = get(a);
 
                    put(c, ma);
src/runtime2/branch.rs
Show inline comments
 
@@ -3,12 +3,19 @@ use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::{Value, ValueGroup};
 

	
 
use super::port::PortIdLocal;
 

	
 
// To share some logic between the FakeTree and ExecTree implementation
 
trait BranchListItem {
 
    fn get_id(&self) -> BranchId;
 
    fn set_next_id(&mut self, id: BranchId);
 
    fn get_next_id(&self) -> BranchId;
 
}
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
@@ -42,71 +49,94 @@ pub(crate) enum SpeculativeState {
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum PreparedStatement {
 
    CreatedChannel((Value, Value)),
 
    ForkedExecution(bool),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl PreparedStatement {
 
    pub(crate) fn is_none(&self) -> bool {
 
        if let PreparedStatement::None = self {
 
            return true;
 
        } else {
 
            return false;
 
        }
 
    }
 

	
 
    pub(crate) fn take(&mut self) -> PreparedStatement {
 
        if let PreparedStatement::None = self {
 
            return PreparedStatement::None;
 
        } else {
 
            let mut replacement = PreparedStatement::None;
 
            std::mem::swap(self, &mut replacement);
 
            return replacement;
 
        }
 
    }
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
 
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
 
    pub prepared: PreparedStatement,
 
}
 

	
 
impl BranchListItem for Branch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
            prepared_channel: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 
        // debug_assert!(
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Inserts a message into the branch for retrieval by a corresponding
 
    /// `get(port)` call.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
#[derive(Copy, Clone)]
 
struct BranchQueue {
 
    first: BranchId,
 
@@ -128,13 +158,13 @@ impl BranchQueue {
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
@@ -145,12 +175,16 @@ impl QueueKind {
 
            QueueKind::AwaitingMessage => 1,
 
            QueueKind::FinishedSync => 2,
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ExecTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
@@ -185,67 +219,44 @@ impl ExecTree {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            return None;
 
        } else {
 
            let first_branch = &mut self.branches[queue.first.index as usize];
 
            queue.first = first_branch.next_in_queue;
 
            first_branch.next_in_queue = BranchId::new_invalid();
 
            if !queue.first.is_valid() {
 
                queue.last = BranchId::new_invalid();
 
            }
 

	
 
            return Some(first_branch.id);
 
        }
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    /// Pushes a branch (ID) into a queue.
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            queue.first = id;
 
            queue.last = id;
 
        } else {
 
            let last_branch = &mut self.branches[queue.last.index as usize];
 
            last_branch.next_in_queue = id;
 
            queue.last = id;
 
        }
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    /// Returns the non-sync branch (TODO: better name?)
 
    pub fn base_branch_mut(&mut self) -> &mut Branch {
 
        debug_assert!(!self.is_in_sync());
 
        return &mut self.branches[0];
 
    }
 

	
 
    /// Returns an iterator over all the elements in the queue of the given
 
    /// kind. One can start the iteration at the branch *after* the provided
 
    /// branch. Just make sure it actually is in the provided queue.
 
    pub fn iter_queue(&self, kind: QueueKind, start_at: Option<BranchId>) -> BranchQueueIter {
 
    /// Returns the branch ID of the first branch in a particular queue.
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
        let index = match start_at {
 
            Some(branch_id) => {
 
                debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id));
 
                let branch = &self.branches[branch_id.index as usize];
 

	
 
                branch.next_in_queue.index as usize
 
            },
 
            None => {
 
                queue.first.index as usize
 
            }
 
        };
 

	
 
        return BranchQueueIter {
 
            branches: self.branches.as_slice(),
 
            index,
 
    /// Returns the next branch ID of a branch (assumed to be in a particular
 
    /// queue.
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    /// Returns an iterator that starts with the provided branch, and then
 
    /// continues to visit all of the branch's parents.
 
    pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter {
 
@@ -282,27 +293,25 @@ impl ExecTree {
 
    }
 

	
 
    /// Collapses the speculative execution tree back into a deterministic one,
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id));
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1);
 

	
 
        // Reset all values to non-sync defaults
 
        let branch = &mut self.branches[0];
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        debug_assert!(!branch.awaiting_port.is_valid());
 
        branch.next_in_queue = BranchId::new_invalid();
 
        branch.inbox.clear();
 
        debug_assert!(branch.prepared_channel.is_none());
 
        debug_assert!(branch.prepared.is_none());
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
 
            self.queues[queue_idx] = BranchQueue::new();
 
        }
 
    }
 
@@ -321,44 +330,227 @@ impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
pub(crate) struct BranchQueueIter<'a> {
 
/// Iterator over the parents of an `ExecTree` branch.
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchQueueIter<'a> {
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            // i.e. the invalid branch index
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.next_in_queue.index as usize;
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    }
 
}
 

	
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
// -----------------------------------------------------------------------------
 
// FakeTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Generic fake branch. This is supposed to be used in conjunction with the
 
/// fake tree. The purpose is to have a branching-like tree to use in
 
/// combination with a consensus algorithm in places where we don't have PDL
 
/// code.
 
pub(crate) struct FakeBranch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal,
 
    pub next_in_queue: BranchId,
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>,
 
}
 

	
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
 
impl BranchListItem for FakeBranch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
impl FakeBranch {
 
    fn new_root(_index: u32) -> FakeBranch {
 
        debug_assert!(_index == 1);
 
        return FakeBranch{
 
            id: BranchId::new(1),
 
            parent_id: BranchId::new_invalid(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
        }
 
    }
 

	
 
    fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch {
 
        return FakeBranch {
 
            id: BranchId::new(index),
 
            parent_id: parent_branch.id,
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// A little helper for native components that don't have a set of branches that
 
/// are actually executing code, but just have to manage the idea of branches
 
/// due to them performing the equivalent of a branching `get` call.
 
pub(crate) struct FakeTree {
 
    pub branches: Vec<FakeBranch>,
 
    queues: [BranchQueue; NUM_QUEUES],
 
}
 

	
 
impl FakeTree {
 
    pub fn new() -> Self {
 
        // TODO: Don't like this? Cause is that now we don't have a non-sync
 
        //  branch. But we assumed BranchId=0 means the branch is invalid. We
 
        //  can do the rusty Option<BranchId> stuff. But we still need a token
 
        //  value within the protocol to signify no-branch-id. Maybe the high
 
        //  bit? Branches are crazy expensive, no-one is going to have 2^32
 
        //  branches anyway. 2^31 isn't too bad.
 
        return Self {
 
            branches: vec![FakeBranch{
 
                id: BranchId::new_invalid(),
 
                parent_id: BranchId::new_invalid(),
 
                sync_state: SpeculativeState::RunningNonSync,
 
                awaiting_port: PortIdLocal::new_invalid(),
 
                next_in_queue: BranchId::new_invalid(),
 
                inbox: HashMap::new(),
 
            }],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    fn is_in_sync(&self) -> bool {
 
        return self.branches.len() > 1;
 
    }
 

	
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync);
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first)
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 

	
 
        // Create the first branch
 
        let sync_branch = FakeBranch::new_root(1);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch {
 
        debug_assert!(branch_id.is_valid());
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Take out the succeeding branch, then just clear all fake branches.
 
        self.branches.swap(1, branch_id.index as usize);
 
        self.branches.truncate(2);
 
        let result = self.branches.pop().unwrap();
 

	
 
        for queue_index in 0..NUM_QUEUES {
 
            self.queues[queue_index] = BranchQueue::new();
 
        }
 

	
 
        return result;
 
    }
 
}
 

	
 
impl Index<BranchId> for FakeTree {
 
    type Output = FakeBranch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for FakeTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Shared logic
 
// -----------------------------------------------------------------------------
 

	
 
fn pop_from_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B]) -> Option<BranchId> {
 
    if queue.is_empty() {
 
        return None;
 
    } else {
 
        let first_branch = &mut branches[queue.first.index as usize];
 
        queue.first = first_branch.get_next_id();
 
        first_branch.set_next_id(BranchId::new_invalid());
 
        if !queue.first.is_valid() {
 
            queue.last = BranchId::new_invalid();
 
        }
 

	
 
        return Some(first_branch.get_id());
 
    }
 
}
 

	
 
fn push_into_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) {
 
    debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid());
 
    if queue.is_empty() {
 
        queue.first = branch_id;
 
        queue.last = branch_id;
 
    } else {
 
        let last_branch = &mut branches[queue.last.index as usize];
 
        last_branch.set_next_id(branch_id);
 
        queue.last = branch_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/connector.rs
Show inline comments
 
@@ -30,12 +30,13 @@ use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::PortId;
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
@@ -52,82 +53,110 @@ impl ConnectorPublic {
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
#[derive(Debug, Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
    Exit,           // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.registered_id.is_some();
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(data) => Some(data.clone()),
 
            None => None,
 
        }
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
            // Run in sync mode
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
 
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                return ConnectorScheduling::Immediate;
 
            } else {
 
                return scheduling
 

	
 
            // Handle any new finished branches
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 

	
 
                if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Actually found a solution
 
                    self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 

	
 
                self.last_finished_handled = Some(branch_id);
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: ComponentState) -> Self {
 
        Self{
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
 
@@ -140,27 +169,34 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        let mut branches = Vec::new(); // TODO: @Remove
 
        if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) {
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        for branch_id in branches.drain(..) {
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
@@ -184,15 +220,13 @@ impl ConnectorPDL {
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
@@ -222,77 +256,82 @@ impl ConnectorPDL {
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
            RunResult::BranchGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `get()` is valid, so mark the branch as awaiting a message
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    // Note: we only know that a branch is waiting on a message when
 
                    // it reaches the `get` call. But we might have already received
 
                    // a message that targets this branch, so check now.
 
                    let mut any_branch_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_branch_received = true;
 
                        }
 
                    }
 

	
 
                    if any_branch_received {
 
                        return ConnectorScheduling::Immediate;
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            RunResult::BranchAtSyncEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else if consistency == Consistency::Inconsistent {
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            RunResult::BranchPut(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `put()` is valid.
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    comp_ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header, data_header,
 
                        content: DataContent::Message(content),
 
                    }));
 

	
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                }));
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
@@ -309,27 +348,26 @@ impl ConnectorPDL {
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
@@ -353,13 +391,13 @@ impl ConnectorPDL {
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared_channel = Some((
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 
@@ -378,8 +416,9 @@ impl ConnectorPDL {
 
        for port in fake_vec {
 
            // TODO: Handle sent/received ports
 
            debug_assert!(ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        ctx.notify_sync_end(&[]);
 
        self.last_finished_handled = None;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind};
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal};
 
use super::inbox::{
 
    Message, PortAnnotation,
 
    DataMessage, DataContent, DataHeader,
 
    SyncMessage, SyncContent, SyncHeader,
 
};
 
use super::scheduler::ComponentCtx;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
@@ -45,19 +47,20 @@ struct Peer {
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
@@ -73,13 +76,13 @@ pub(crate) enum Consistency {
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
@@ -102,42 +105,46 @@ impl Consensus {
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ctx.get_ports().iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: v.self_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            port_mapping: parent_branch_annotations.port_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
@@ -184,74 +191,65 @@ impl Consensus {
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates sync messages for any branches that are at the end of the
 
    /// sync block. To find these branches, they should've been put in the
 
    /// "finished" queue in the execution tree.
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let mut last_branch_id = self.last_finished_handled;
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                // Note: if the port is silent, and we've never communicated
 
                // over the port, then we need to do so now, to let the peer
 
                // component know about our sync leader state.
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                let peer_port_id = port_desc.peer_id;
 
                let channel_id = port_desc.channel_id;
 

	
 
                if !self.encountered_ports.contains(&port.port_id) {
 
                    ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header: SyncHeader{
 
                            sending_component_id: ctx.id,
 
                            highest_component_id: self.highest_connector_id,
 
                            sync_round: self.sync_round
 
                        },
 
                        data_header: DataHeader{
 
                            expected_mapping: source_mapping.clone(),
 
                            sending_port: port.port_id,
 
                            target_port: peer_port_id,
 
                            new_mapping: BranchId::new_invalid(),
 
                        },
 
                        content: DataContent::SilentPortNotification,
 
                    }));
 
                    self.encountered_ports.push(port.port_id);
 
                }
 

	
 
                target_mapping.push((
 
                    channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 
            let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
            if solution_branch.is_some() {
 
                // No need to continue iterating, we've found the solution
 
                return solution_branch;
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&port.port_id) {
 
                ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    data_header: DataHeader{
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: port.port_id,
 
                        target_port: peer_port_id,
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(port.port_id);
 
            }
 

	
 
            last_branch_id = Some(branch.id);
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
        return None;
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
        return solution_branch;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory.
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        final_ports.clear();
 
@@ -260,13 +258,12 @@ impl Consensus {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.last_finished_handled = None;
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 

	
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
@@ -299,43 +296,42 @@ impl Consensus {
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the data and sync header, and
 
    /// checking which *existing* branches *can* receive the message. So two
 
    /// cautionary notes:
 
    /// 1. A future branch might also be able to receive this message, see the
 
    ///     `branch_can_receive` function.
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec<BranchId>) -> bool {
 
        self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids);
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool {
 
        return self.handle_received_sync_header(&message.sync_header, ctx)
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
@@ -363,24 +359,24 @@ impl Consensus {
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) {
 
        debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content));
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
            if mapping.port_id == message.data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports);
 
                find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
@@ -391,26 +387,26 @@ impl Consensus {
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) {
 
            if sync_header.sync_round < peer.expected_sync_round {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        if let DataContent::SilentPortNotification = content {
 
        if let DataContent::SilentPortNotification = message.content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
@@ -423,27 +419,12 @@ impl Consensus {
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, sync_header, data_header, content) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        if !self.handle_peer(sync_header) {
 
            // We can drop this package
 
            return false;
 
        }
 
@@ -575,13 +556,13 @@ impl Consensus {
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
src/runtime2/inbox.rs
Show inline comments
 
@@ -10,16 +10,38 @@ use super::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
    pub registered_id: Option<BranchId>,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
 
/// unique identifier for a branch, but differs in that a branch only has one
 
/// branch ID, but might have multiple associated markers (i.e. one branch
 
/// performing a `put` three times will generate three markers.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchMarker{
 
    marker: u32,
 
}
 

	
 
impl BranchMarker {
 
    #[inline]
 
    pub(crate) fn new(marker: u32) -> Self {
 
        debug_assert!(marker != 0);
 
        return Self{ marker };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ marker: 0 }
 
    }
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
    pub sending_component_id: ConnectorId,
 
    pub highest_component_id: ConnectorId,
 
    pub sync_round: u32,
 
@@ -28,13 +50,13 @@ pub(crate) struct SyncHeader {
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchId,
 
    pub new_mapping: BranchMarker,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
 
//  data message was neat because "silent port notification" should be rerouted
 
//  like any other data message to determine the component ID of the receiver
 
//  and to make it part of the leader election algorithm for the sync leader.
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::find_ports_in_value_group;
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{Message, ControlContent, ControlMessage};
 
use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
type SyncDone = Arc<(Mutex<Option<FinishedSync>>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    SyncRound(Vec<ApplicationSyncAction>),
 
    Shutdown,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorApplication
 
// -----------------------------------------------------------------------------
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
// TODO: Strong candidate for logic reduction in handling put/get. A lot of code
 
//  is an approximate copy-pasta from the regular component logic. I'm going to
 
//  wait until I'm implementing more native components to see which logic is
 
//  truly common.
 
pub struct ConnectorApplication {
 
    // Communicating about new jobs and setting up sync rounds
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    is_in_sync: bool,
 
    // Handling current sync round
 
    sync_desc: Vec<ApplicationSyncAction>,
 
    tree: FakeTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
    branch_extra: Vec<usize>, // instruction counter per branch
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if self.is_in_sync {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_solution_branch(solution_branch, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            return self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let sync_done = Arc::new(( Mutex::new(None), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone()
 
            job_queue: job_queue.clone(),
 
            is_in_sync: false,
 
            sync_desc: Vec::new(),
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Handle any incoming messages if we're participating in a round
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(_) => todo!("data message in API connector"),
 
                Message::Sync(_)  => todo!("sync message in API connector"),
 
                Message::Control(_) => todo!("impossible control message"),
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
    }
 

	
 
        // Handle requests coming from the API
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            while let Some(job) = queue.pop_front() {
 
                match job {
 
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                        comp_ctx.push_port(endpoint_a);
 
                        comp_ctx.push_port(endpoint_b);
 
    pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.handle_new_messages(comp_ctx);
 

	
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 
        let mut instruction_idx = self.branch_extra[branch_id.index as usize];
 

	
 
        if instruction_idx >= self.sync_desc.len() {
 
            // Performed last instruction, so this branch is officially at the
 
            // end of the synchronous interaction.
 
            let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
            if consistency == Consistency::Valid {
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
            } else {
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            }
 
        } else {
 
            // We still have instructions to perform
 
            let cur_instruction = &self.sync_desc[instruction_idx];
 
            self.branch_extra[branch_id.index as usize] += 1;
 

	
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 

	
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: DataContent::Message(content.clone()),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 

	
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    }
 
                    ApplicationJob::NewConnector(connector, initial_ports) => {
 
                        comp_ctx.push_component(connector, initial_ports);
 
                    },
 
                    ApplicationJob::Shutdown => {
 
                        debug_assert!(queue.is_empty());
 
                        return ConnectorScheduling::Exit;
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) {
 
        debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            debug_assert!(comp_ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
        // Turning hashmapped inbox into vector of values
 
        let mut inbox = Vec::with_capacity(solution_branch.inbox.len());
 
        for action in &self.sync_desc {
 
            match action {
 
                ApplicationSyncAction::Put(_, _) => {},
 
                ApplicationSyncAction::Get(port_id) => {
 
                    debug_assert!(solution_branch.inbox.contains_key(port_id));
 
                    inbox.push(solution_branch.inbox.remove(port_id).unwrap());
 
                },
 
            }
 
        }
 

	
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ inbox });
 
        notification.notify_one();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ApplicationInterface
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub enum ChannelCreationError {
 
    InSync,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationStartSyncError {
 
    AlreadyInSync,
 
    NoSyncActions,
 
    IncorrectPortKind,
 
    UnownedPort,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
    Put(PortIdLocal, ValueGroup),
 
    Get(PortIdLocal),
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    is_in_sync: bool,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
    owned_ports: Vec<(PortKind, PortIdLocal)>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            is_in_sync: false,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
    /// Creates a new channel. Can only fail if the application interface is
 
    /// currently in sync mode.
 
    pub fn create_channel(&mut self) -> Result<Channel, ChannelCreationError> {
 
        if self.is_in_sync {
 
            return Err(ChannelCreationError::InSync);
 
        }
 

	
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 
        self.owned_ports.push((PortKind::Putter, putter_id));
 
        self.owned_ports.push((PortKind::Getter, getter_id));
 

	
 
        return Channel{ putter_id, getter_id };
 
        return Ok(Channel{ putter_id, getter_id });
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Yank out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        if self.is_in_sync {
 
            return Err(ComponentCreationError::InSync);
 
        }
 

	
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|v| v == initial_port) {
 
            if !self.owned_ports.iter().any(|(_, v)| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
 
            let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(state);
 

	
 
@@ -159,24 +434,72 @@ impl ApplicationInterface {
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    /// Queues up a description of a synchronous round to run. Will not actually
 
    /// run the synchronous behaviour in blocking fashion. The results *must* be
 
    /// retrieved using `try_wait` or `wait` for the interface to be considered
 
    /// in non-sync mode.
 
    // TODO: Maybe change API in the future. For now it does the job
 
    pub fn perform_sync_round(&mut self, actions: Vec<ApplicationSyncAction>) -> Result<(), ApplicationStartSyncError> {
 
        if self.is_in_sync {
 
            return Err(ApplicationStartSyncError::AlreadyInSync);
 
        }
 

	
 
        // Check the action ports for consistency
 
        for action in &actions {
 
            let (port_id, expected_kind) = match action {
 
                ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter),
 
                ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter),
 
            };
 

	
 
            match self.find_port_by_id(port_id) {
 
                Some(port_kind) => {
 
                    if port_kind != expected_kind {
 
                        return Err(ApplicationStartSyncError::IncorrectPortKind)
 
                    }
 
                },
 
                None => {
 
                    return Err(ApplicationStartSyncError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        // Everything is consistent, go into sync mode and send the actions off
 
        // to the component that will actually perform the sync round
 
        self.is_in_sync = true;
 
        {
 
            let (is_done, _) = &*self.sync_done;
 
            let mut lock = is_done.lock().unwrap();
 
            *lock = None;
 
        }
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::SyncRound(actions));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        return Ok(())
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
    /// Wait until the next sync-round is finished, returning the received
 
    /// messages in order of `get` calls.
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 

	
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        return Ok(lock.take().unwrap().inbox);
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 
@@ -195,12 +518,18 @@ impl ApplicationInterface {
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        }
 
    }
 

	
 
    fn find_port_by_id(&self, port_id: PortIdLocal) -> Option<PortKind> {
 
        return self.owned_ports.iter()
 
            .find(|(_, owned_id)| *owned_id == port_id)
 
            .map(|(port_kind, _)| *port_kind);
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
src/runtime2/scheduler.rs
Show inline comments
 
@@ -293,14 +293,15 @@ impl Scheduler {
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx.inbox_messages.clear();
 
                // Just left sync region. So clear inbox up until the last
 
                // message that was read.
 
                scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read);
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 
@@ -377,13 +378,13 @@ pub(crate) struct ComponentPortChange {
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<Message>, // never control or ping messages
 
    inbox_messages: Vec<Message>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
src/runtime2/tests/api_component.rs
Show inline comments
 
new file 100644
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_getting_from_component() {
 
    const CODE: &'static str ="
 
    primitive loop_sender(out<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            sync {
 
                put(numbers, cur);
 
                cur += 1;
 
            }
 
        }
 
    }";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_sender", ValueGroup::new_stack(vec![
 
        Value::Output(PortId::new(channel.putter_id.index)),
 
        Value::UInt32(1337),
 
        Value::UInt32(1337 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Get(channel.getter_id),
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 

	
 
        assert!(result.len() == 1 && result[0].values.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, 1337 + loop_idx);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_putting_to_component() {
 
    const CODE: &'static str = "
 
    primitive loop_receiver(in<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            sync {
 
                auto number = get(numbers);
 
                assert(number == cur);
 
                cur += 1;
 
            }
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_receiver", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(42),
 
        Value::UInt32(42 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::UInt32(42 + loop_idx)])),
 
        ]).expect("start sync round");
 

	
 
        // Note: if we finish a round, then it must have succeeded :)
 
        api.wait().expect("finish sync round");
 
    }
 
}
 

	
 
#[test]
 
fn test_doing_nothing() {
 
    const CODE: &'static str = "
 
    primitive getter(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {}
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for _ in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![]).expect("start silent sync round");
 
        api.wait().expect("finish silent sync round");
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::Bool(true)]))
 
        ]).expect("start firing sync round");
 
        let res = api.wait().expect("finish firing sync round");
 
        assert!(res.is_empty());
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/basics.rs
Show inline comments
 
new file 100644
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_single_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("single_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_multi_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter_static(out<u8> vals, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                put(vals, 0b00000001);
 
                put(vals, 0b00000100);
 
                put(vals, 0b00010000);
 
                put(vals, 0b01000000);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter_dynamic(in<u8> vals, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                u32 recv_index = 0;
 
                u8 expected = 1;
 
                while (recv_index < 4) {
 
                    auto gotten = get(vals);
 
                    assert(gotten == expected);
 
                    expected <<= 2;
 
                    recv_index += 1;
 
                }
 
            }
 
            loop_index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("multi_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "putter_static", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "getter_dynamic", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
    })
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation_basic;
 
mod basics;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 5;   // number of test instances constructed
 
const NUM_LOOPS: u32 = 5;       // number of loops within a single test (not used by all tests)
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
@@ -20,23 +27,21 @@ fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor:
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
struct TestTimer {
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    fn new(name: &'static str) -> Self {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
@@ -44,189 +49,6 @@ impl Drop for TestTimer {
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            synchronous {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ]));
 
    });
 
}
 
\ No newline at end of file
src/runtime2/tests/network_shapes.rs
Show inline comments
 
new file 100644
 
// Testing particular graph shapes
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            sync {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create connector");
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            sync {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create connector");
 
    });
 
}
 
\ No newline at end of file
src/runtime2/tests/speculation_basic.rs
Show inline comments
 
new file 100644
 
// Testing speculation - Basic forms
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_maybe_do_nothing() {
 
    // Three variants in which the behaviour in which nothing is performed is
 
    // somehow not allowed. Note that we "check" by seeing if the test finishes.
 
    // Only the branches in which ports fire increment the loop index
 
    const CODE: &'static str = "
 
    primitive only_puts(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { put(output, true); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_put(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                fork { put(output, true); index += 1; }
 
                or   {}
 
            }
 
        }
 
    }
 

	
 
    primitive only_gets(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_get(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync fork { auto res = get(input); assert(res); index += 1; } or {}
 
        }
 
    }
 
    ";
 

	
 
    // Construct all variants which should work and wait until the runtime exits
 
    run_test_in_runtime(CODE, |api| {
 
        // only putting -> maybe getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "only_puts", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "might_get", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 

	
 
        // maybe putting -> only getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "might_put", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "only_gets", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
    })
 
}
 
\ No newline at end of file
testdata/parser/negative/1.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested twice in primitive
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
			synchronous {}
 
		sync {
 
			sync {}
 
		}
 
	}
 
}
testdata/parser/negative/1.txt
Show inline comments
 
Parse error at 1.pdl:7:4: Illegal nested synchronous statement
 
			synchronous {}
 
			sync {}
 
			^
testdata/parser/negative/10.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested in sync block
 
primitive main(in a, out b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (false || true) {
 
				synchronous {
 
				sync {
 
					skip;
 
				}
 
			}
 
		}
 
	} 
 
}
testdata/parser/negative/10.txt
Show inline comments
 
Parse error at 10.pdl:8:5: Illegal nested synchronous statement
 
				synchronous {
 
				sync {
 
				^
testdata/parser/negative/12.pdl
Show inline comments
 
#version 100
 

	
 
// illegal node declaration
 
primitive main(in a, out b) {
 
	while (true) {
 
		channel x -> y;
 
		synchronous {}
 
		sync {}
 
	}
 
}
testdata/parser/negative/19.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				return 5;
 
			} else {
 
				block(a);
 
			}
 
		}
testdata/parser/negative/24.pdl
Show inline comments
 
@@ -4,11 +4,11 @@ primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * x *= 5;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/negative/3.pdl
Show inline comments
 
#version 100
 

	
 
// sync block nested deeply in composite
 
composite main(in a, out b) {
 
	channel x -> y;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			skip;
 
		}
 
	}
 
}
testdata/parser/negative/3.txt
Show inline comments
 
Parse error at 3.pdl:7:3: Illegal nested synchronous statement
 
		synchronous {
 
		sync {
 
		^
testdata/parser/negative/31.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            break; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/32.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(int a) {
 
    loop: {
 
        synchronous {
 
        sync {
 
            goto loop; // not allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/negative/4.pdl
Show inline comments
 
@@ -3,11 +3,11 @@
 
// built-in outside sync block
 
primitive main(in a, out b) {
 
	int x = 0;
 
	msg y = create(0); // legal
 
	while (x < 10) {
 
		y = get(a); // illegal
 
		synchronous {
 
		sync {
 
			y = get(a); // legal
 
		}
 
	}
 
}
testdata/parser/negative/6.pdl
Show inline comments
 
#version 100
 

	
 
import std.reo;
 

	
 
// duplicate formal parameters
 
composite main(in a, out a) {
 
	new sync(a, a);
 
	new sync_component(a, a);
 
}
testdata/parser/negative/7.pdl
Show inline comments
 
@@ -2,8 +2,8 @@
 

	
 
import std.reo;
 

	
 
// shadowing formal parameter
 
composite main(in a, out b) {
 
	channel c -> a;
 
	new sync(a, b);
 
	new sync_component(a, b);
 
}
testdata/parser/negative/8.pdl
Show inline comments
 
@@ -7,13 +7,13 @@ composite main(in a, out b) {
 
	syncdrain(a, b);
 
}
 

	
 
// shadowing import
 
primitive syncdrain(in a, in b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (!fires(a) || !fires(b)) {
 
				block(a);
 
				block(b);
 
			}
 
		}
 
	}
testdata/parser/positive/1.pdl
Show inline comments
 
@@ -8,13 +8,13 @@ composite main(in asend, out arecv, in bsend, out brecv) {
 
    // x fires first, then y, then x, et cetera
 
    new sequencer(xi, yi);
 
}
 

	
 
primitive replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
@@ -37,13 +37,13 @@ composite sequencer(in x, in y) {
 
    new fifo(ci, fo, null);
 
    new fifo(di, eo, create(0));
 
}
 

	
 
primitive syncdrain(in a, in b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                get(a);
 
                get(b);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
@@ -51,13 +51,13 @@ primitive syncdrain(in a, in b) {
 
    }
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (c != null) {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
@@ -72,21 +72,21 @@ primitive fifo(in a, out b, msg init) {
 
}
 

	
 
primitive sequencer2(in x, in y) {
 
	while (true) {
 
	    boolean b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(y);
 
				if (fires(x))
 
					b = true;
 
			}
 
		}
 
		b = false;
 
		while (!b) {
 
			synchronous {
 
			sync {
 
				assert !fires(x);
 
				if (fires(y))
 
					b = true;
 
			}
 
		}
 
	}
testdata/parser/positive/10.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(in a, out[] b) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(a)) {
 
				int i = 0;
 
				while (i < b.length) {
 
					if (fires(b[i])) {
 
						int j = i + 1;
 
						while (j < b.length)
testdata/parser/positive/11.pdl
Show inline comments
 
#version 100
 

	
 
primitive main(in a, out b) {
 
	msg x = null;
 
	while (x == null) {
 
		synchronous {
 
		sync {
 
			if (fires(a))
 
				x = get(a);
 
		}
 
	}
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(b))
 
				put(b, x);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/12.pdl
Show inline comments
 
@@ -4,11 +4,11 @@ primitive main(in a, out b) {
 
	int x = 0;
 
	int y = 0;
 
	x += y + 5;
 
	y %= x -= 3;
 
	x *= x * (x *= 5);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			assert fires(a) == fires(b);
 
		}
 
	}
 
}
 
\ No newline at end of file
testdata/parser/positive/13.pdl
Show inline comments
 
@@ -11,13 +11,13 @@ composite example(in[] a, in[] b, out x) {
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					break;
 
				}
 
@@ -28,13 +28,13 @@ primitive resolve(in[] a, in[] b, out x) {
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length)
 
				if (fires(a[i++])) break;
 
			while (i < a.length)
 
				assert !fires(a[i++]);
 
		}
testdata/parser/positive/14.pdl
Show inline comments
 
#version 100
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
    channel bo -> bi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new binary_replicator(bi, ao, c);
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	msg x = get(a);
 
            	put(b, x);
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
 
    }
 
}
 

	
 
primitive binary_replicator(in b, out a, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(b) && fires(a) && fires(c)) {
 
                msg x = get(b);
 
                put(a, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
testdata/parser/positive/15.pdl
Show inline comments
 
@@ -4,13 +4,13 @@ import std.reo;
 

	
 
composite main(out c) {
 
	channel ao -> ai;
 
	channel bo -> bi;
 
	channel axo -> axi;
 
	channel zo -> zi;
 
	new sync(ai, bo);
 
	new sync_component(ai, bo);
 
	new replicator(bi, {axo, c});
 
	new consensus({axi, zi}, ao);
 
	new generator(zo);
 
}
 

	
 
primitive generator(out z) {
testdata/parser/positive/16.pdl
Show inline comments
 
@@ -4,13 +4,13 @@ composite main() {
 
	channel xo -> xi;
 
	new a(xi);
 
	new c(xo);
 
}
 

	
 
primitive a(in x) {
 
	synchronous {
 
	sync {
 
		msg m = get(x);
 
		assert m.length == 5;
 
		assert m[0] == 'h';
 
		assert m[1] == 'e';
 
		assert m[2] == 'l';
 
		assert m[3] == 'l';
 
@@ -22,13 +22,13 @@ primitive b(out x) {
 
	synchronous (msg m) {
 
		put(x, m);
 
	}
 
}
 
// or
 
primitive c(out x) {
 
	synchronous {
 
	sync {
 
		msg m = create(5);
 
		m[0] = 'h';
 
		m[1] = 'e';
 
		m[2] = 'l';
 
		m[3] = 'l';
 
		m[4] = 'o';
testdata/parser/positive/17.pdl
Show inline comments
 
@@ -5,13 +5,13 @@ composite main(in x, out y) {
 
}
 

	
 
primitive prophet(in b, out a) {
 
	msg c = null;
 
	while (true) {
 
		if (c != null) {
 
			synchronous {
 
			sync {
 
				assert !fires(a);
 
				if (fires(b)) {
 
					assert get(b) == c;
 
					c = null;
 
				}
 
			}
 
@@ -28,21 +28,21 @@ primitive prophet(in b, out a) {
 
}
 

	
 
primitive fifo(in a, out b, msg init) {
 
    msg c = init;
 
    while (true) {
 
        if (c != null) {
 
        	synchronous {
 
        	sync {
 
                assert !fires(a);
 
                if (fires(b)) {
 
                    put(b, c);
 
                    c = null;
 
                }
 
            }
 
        } else {
 
        	synchronous {
 
        	sync {
 
                assert !fires(b);
 
                if (fires(a)) {
 
                    c = get(a);
 
                }
 
            }
 
        }
testdata/parser/positive/18.pdl
Show inline comments
 
@@ -9,24 +9,24 @@ primitive main1(in a, out c) {
 
	int y = 0;
 
	msg z = null;
 
	msg w = null;
 
	x = 1;
 
	y = 1;
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (x > 0 && fires(a)) {
 
				z = get(a);
 
				x--;
 
			}
 
			if (w != null && fires(c)) {
 
				put(c, w);
 
				w = null;
 
				y++;
 
			}
 
		}
 
		synchronous {
 
		sync {
 
			assert !fires(a) && !fires(c);
 
			if (z != null && y > 0) {
 
				w = z;
 
				z = null;
 
				y--;
 
				x++;
testdata/parser/positive/19.pdl
Show inline comments
 
#version 100
 

	
 
composite main() {}
 

	
 
primitive example(int a) {
 
    synchronous {
 
    sync {
 
        loop: {
 
            goto loop; // allowed
 
        }
 
    }
 
}
 
\ No newline at end of file
testdata/parser/positive/2.pdl
Show inline comments
 
@@ -13,13 +13,13 @@ composite main(in asend, out arecv, in bsend, out brecv, in csend, out crecv) {
 
    new replicator(yi, {arecv, zo});
 
    new replicator(zi, {brecv, crecv});
 
}
 

	
 
primitive mymerger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && !fires(b) && fires(c)) {
 
                put(c, get(a));
 
            } else if (!fires(a) && fires(b) && fires(c)) {
 
                put(c, get(b));
 
            } else {
 
            	assert !fires(a) && !fires(b) && !fires(c);
testdata/parser/positive/3.pdl
Show inline comments
 
@@ -27,13 +27,13 @@ composite main(in ai, out ao, in bi, out bo, in ci, out co, in di, out do) {
 
        new replicator(xxxi, co, do);
 
    }
 
}
 

	
 
primitive computeMax(in a, in b, in c, in d, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
            if (fires(a) && fires(b) && fires(c) && fires(d) && fires(x)) {
 
            	msg aa = get(a);
 
            	msg bb = get(b);
 
            	msg cc = get(c);
 
            	msg dd = get(d);
 
            	uint16_t aaa = aa[0] & aa[1] << 8;
testdata/parser/positive/5.pdl
Show inline comments
 
@@ -2,13 +2,13 @@
 

	
 
import std.reo;
 
import std.buf;
 

	
 
primitive main(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
                msg x = get(a);
 
                short y = readShort(x, 0);
 
                y++;
 
                writeShort(x, 0, y);
 
                put(b, x);
testdata/parser/positive/6.pdl
Show inline comments
 
@@ -11,34 +11,34 @@ composite reonode(in[] a, out[] b) {
 
}
 

	
 
composite replicator(in a, out[] b) {
 
	if (b.length == 0) {
 
		new blocking(a);
 
	} else if (b.length == 1) {
 
		new sync(a, b[0]);
 
		new sync_component(a, b[0]);
 
	} else {
 
		channel xo -> xi;
 
		new binary_replicator(a, b[0], xo);
 
		new replicator(xi, b[1 : b.length - 1]);
 
	}
 
}
 
primitive binary_replicator(in a, out b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b) && fires(c)) {
 
                msg x = get(a);
 
                put(b, x);
 
                put(c, x);
 
            } else {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive blocking(in a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
composite merger(in[] a, out b) {
 
	if (a.length == 0) {
 
@@ -49,18 +49,18 @@ composite merger(in[] a, out b) {
 
		while (i < a.length) {
 
			channel yi -> yo;
 
			new binary_merger(prev, a[i], yo);
 
			prev = yi;
 
			i++;
 
		}
 
		new sync(prev, b);
 
		new sync_component(prev, b);
 
	}
 
}
 
primitive binary_merger(in a, in b, out c) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(c)) {
 
                assert !fires(b);
 
                put(c, get(a));
 
            } else if (fires(b) && fires(c)) {
 
                assert !fires(a);
 
                put(c, get(b));
 
@@ -68,20 +68,20 @@ primitive binary_merger(in a, in b, out c) {
 
                assert !fires(a) && !fires(b) && !fires(c);
 
            }
 
        }
 
    }
 
}
 
primitive silent(out a) {
 
	while (true) synchronous {
 
	while (true) sync {
 
		assert !fires(a);
 
	}
 
}
 

	
 
primitive sync(in a, out b) {
 
    while (true) {
 
        synchronous {
 
        sync {
 
            if (fires(a) && fires(b)) {
 
            	put(b, get(a));
 
            } else {
 
                assert !fires(a) && !fires(b);
 
            }
 
        }
testdata/parser/positive/7.pdl
Show inline comments
 
@@ -47,13 +47,13 @@ composite puzzle(in[] a, in[] b, out x) {
 
	new async(b);
 
	new resolve(a, b, x);
 
}
 

	
 
primitive resolve(in[] a, in[] b, out x) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			while (i < a.length && i < b.length) {
 
				if (fires(a[i]) && fires(b[i])) {
 
					put(x, create(0)); // send token to x
 
					goto end;
 
				}
 
@@ -64,13 +64,13 @@ primitive resolve(in[] a, in[] b, out x) {
 
		}
 
	}
 
}
 

	
 
primitive async(in[] a) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			int i = 0;
 
			int j = 0;
 
			while (i < a.length) {
 
				if (fires(a[i])) break;
 
				i++;
 
			}
testdata/parser/positive/8.pdl
Show inline comments
 
@@ -28,13 +28,13 @@ composite main(out x) {
 
	new replicator(bi, {co, x});
 
	new recorder(ao, ci);
 
}
 

	
 
primitive evil_or_odious(in x, out y) {
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(x) && fires(y)) {
 
				msg a = get(x);
 
				msg result = create(1);
 
				boolean even = true;
 
				int i = 0;
 
				while (i < a.length) {
 
@@ -50,13 +50,13 @@ primitive evil_or_odious(in x, out y) {
 
		}
 
	}
 
}
 
primitive recorder(out h, in a) {
 
	msg c = create(0);
 
	while (true) {
 
		synchronous {
 
		sync {
 
			if (fires(h) && fires(a)) {
 
				put(h, c);
 
				{
 
					msg x = get(a);
 
					msg n = create(c.length + 1);
 
					int i = 0;
testdata/parser/positive/tarry.pdl
Show inline comments
 
@@ -40,13 +40,13 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) {
 
	in[] neighbori = {};
 
	out[] neighboro = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		// Step 1. Initiator waits for token
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Reset neighbors
 
@@ -56,13 +56,13 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) {
 
		peero = {};
 
		// Step 2. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			int idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
@@ -75,26 +75,26 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) {
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 3. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 4. Token is back and all neighbors visited
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
@@ -108,13 +108,13 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) {
 
	out[] parento = {};
 
	assert peeri.length == peero.length;
 
	while (true) {
 
		int idx = 0;
 
		// Step 1. Await token for first time
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				int i = 0;
 
				while (i < peeri.length) {
 
					if (fires(peeri[i])) {
 
						token = get(peeri[i]);
 
						idx = i;
 
						break;
 
@@ -128,32 +128,32 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) {
 
		parenti = {peeri[idx]};
 
		parento = {peero[idx]};
 
		peeri = {};
 
		peero = {};
 
		// Step 2. Non-initiator signals
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(end)) {
 
					put(end, token);
 
					token = null;
 
				}
 
			}
 
		}
 
		while (token == null) {
 
			synchronous {
 
			sync {
 
				if (fires(start)) {
 
					token = get(start);
 
				}
 
			}
 
		}
 
		// Step 3. Keep sending token to processes
 
		while (neighbori.length > 0) {
 
			idx = 0;
 
			// Select first channel that accepts our token
 
			while (token != null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < neighboro.length) {
 
						if (fires(neighboro[i])) {
 
							put(neighboro[i], token);
 
							idx = i;
 
							token = null;
 
@@ -166,26 +166,26 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) {
 
			peeri = {neighbori[idx]} @ peeri;
 
			peero = {neighboro[idx]} @ peero;
 
			neighbori = neighbori[0:idx] @ neighbori[idx:neighbori.length];
 
			neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length];
 
			// Step 4. Await return of token
 
			while (token == null) {
 
				synchronous {
 
				sync {
 
					int i = 0;
 
					while (i < peeri.length + neighbori.length) {
 
						if (fires(peeri@neighbori[i])) {
 
							token = get(peeri@neighbori[i]);
 
							break;
 
						} else i++;
 
					}
 
				}
 
			}
 
		}
 
		// Step 5. Token is back, pass to parent
 
		while (token != null) {
 
			synchronous {
 
			sync {
 
				if (fires(parento[0])) {
 
					put(parento[0], token);
 
					token = null;
 
				}
 
			}
 
		}
0 comments (0 inline, 0 general)