From 36cc1fe490f794b58a68d8ba906cc56ece6876f9 2021-11-15 12:23:58 From: MH Date: 2021-11-15 12:23:58 Subject: [PATCH] Merge branch 'feat-api-cmds-and-branching' Implements the programmer-facing API to allow programmatic specification of a synchronous round. The way in which these put/get interactions are performed is in an initial shape. Perhaps this will change in the future. The second main set of changes is the addion of a 'fork' statement, which allows explicit forking, and allowing multiple puts/gets over the same transport link within a single sync round. --- diff --git a/examples/bench_04/main.c b/examples/bench_04/main.c index f898b13596e4483661f33688b6f0a93e76946f65..dd190a5dd95562ae804bb87fdf357c953acccdd0 100644 --- a/examples/bench_04/main.c +++ b/examples/bench_04/main.c @@ -8,7 +8,7 @@ int main(int argc, char** argv) { const unsigned char pdl[] = "primitive trivial_loop() { " - " while(true) synchronous{}" + " while(true) sync {}" "} " ; Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); diff --git a/examples/bench_05/main.c b/examples/bench_05/main.c index 874d24951072635f31556b18cd1d753afc1ed124..6cd96626e8afd969756b6e5fa69e4c4ca67373dc 100644 --- a/examples/bench_05/main.c +++ b/examples/bench_05/main.c @@ -9,7 +9,7 @@ int main(int argc, char** argv) { const unsigned char pdl[] = "primitive trivial_loop() { " - " while(true) synchronous{}" + " while(true) sync {}" "} " ; Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); diff --git a/examples/bench_09/main.c b/examples/bench_09/main.c index 4cd3fcf4db542cdd67f9ca4f7bcd03e6fc4bc40b..dcf524313be93c129c46dea4e3213a94a7f2af41 100644 --- a/examples/bench_09/main.c +++ b/examples/bench_09/main.c @@ -12,7 +12,7 @@ int main(int argc, char** argv) { " while(true) { " " i = 0; " " while(i < 2) i++; " - " synchronous {} " + " sync {} " " } " "} " ; diff --git a/examples/bench_11/main.c b/examples/bench_11/main.c index 3f6b4f055cd8392119ee7b947106affb0646d374..3c4002029ee8af18799f7f532da75632eee6bdd9 100644 --- a/examples/bench_11/main.c +++ b/examples/bench_11/main.c @@ -9,7 +9,7 @@ int main(int argc, char** argv) { forwards, num_options); unsigned char pdl[] = "primitive recv_zero(in a) { " - " while(true) synchronous {" + " while(true) sync {" " msg m = get(a); " " assert(m[0] == 0); " " } " diff --git a/examples/bench_23/main.c b/examples/bench_23/main.c index 0e02672f40a12dc197bee343f23f56f4cbe300c2..b27273f1b5bf4e100d76777f425c82e6ad8ae545 100644 --- a/examples/bench_23/main.c +++ b/examples/bench_23/main.c @@ -6,7 +6,7 @@ int main(int argc, char** argv) { // unsigned char pdl[] = "\ // primitive xrouter(in a, out b, out c) {\ - // while(true) synchronous {\ + // while(true) sync {\ // if(fires(a)) {\ // if(fires(b)) put(b, get(a));\ // else put(c, get(a));\ @@ -16,7 +16,7 @@ int main(int argc, char** argv) { // ; unsigned char pdl[] = "\ primitive lossy(in a, out b) {\ - while(true) synchronous {\ + while(true) sync {\ if(fires(a)) {\ msg m = get(a);\ if(fires(b)) put(b, m);\ @@ -24,7 +24,7 @@ int main(int argc, char** argv) { }\ }\ primitive sync_drain(in a, in b) {\ - while(true) synchronous {\ + while(true) sync {\ if(fires(a)) {\ get(a);\ get(b);\ diff --git a/examples/bench_24/main.c b/examples/bench_24/main.c index a8558558a022b50f9918999e806a7b1350cd6d93..1cfde7b7270f7e2ef4da425b28b33d7720ee83a9 100644 --- a/examples/bench_24/main.c +++ b/examples/bench_24/main.c @@ -6,7 +6,7 @@ int main(int argc, char** argv) { unsigned char pdl[] = "\ primitive fifo1_init(msg m, in a, out b) {\ - while(true) synchronous {\ + while(true) sync {\ if(m != null && fires(b)) {\ put(b, m);\ m = null;\ @@ -39,7 +39,7 @@ int main(int argc, char** argv) { // unsigned char pdl[] = "\ // primitive sequencer3(out a, out b, out c) {\ // int i = 0;\ - // while(true) synchronous {\ + // while(true) sync {\ // out to = a;\ // if (i==1) to = b;\ // else if(i==2) to = c;\ diff --git a/examples/bench_27/main.c b/examples/bench_27/main.c index 8471d124fc401559b6fef1281c42e21292a2ca7e..7aa6fa92931fdc1d68dabfa1ef6386c0c7353824 100644 --- a/examples/bench_27/main.c +++ b/examples/bench_27/main.c @@ -9,7 +9,7 @@ int main(int argc, char** argv) { unsigned char pdl[] = "\ primitive xrouter(in a, out b, out c) {\ - while(true) synchronous {\ + while(true) sync {\ if(fires(a)) {\ if(fires(b)) put(b, get(a));\ else put(c, get(a));\ diff --git a/examples/eg_protocols.pdl b/examples/eg_protocols.pdl index 4de6b59a35c1d45e788bee6a58ec0f1a5e9f8c8b..72c7a48f83e16e5d745fb6b4fa5ebcb02f8b091f 100644 --- a/examples/eg_protocols.pdl +++ b/examples/eg_protocols.pdl @@ -1,10 +1,10 @@ primitive pres_2(in i, out o) { - synchronous { + sync { put(o, get(i)); } } primitive together(in ia, in ib, out oa, out ob){ - while(true) synchronous { + while(true) sync { if(fires(ia)) { put(oa, get(ia)); put(ob, get(ib)); @@ -14,7 +14,7 @@ primitive together(in ia, in ib, out oa, out ob){ primitive alt_round_merger(in a, in b, out c){ while(true) { - synchronous{ put(c, get(a)); } - synchronous{ put(c, get(b)); } + sync { put(c, get(a)); } + sync { put(c, get(b)); } } } diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index a633ce0c0c9ee19ebf048d71834b28bc23c58200..d13858742e9dfebef5e58077d83232985bbdd954 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -138,6 +138,8 @@ define_new_ast_id!(BreakStatementId, StatementId, index(BreakStatement, Statemen define_new_ast_id!(ContinueStatementId, StatementId, index(ContinueStatement, Statement::Continue, statements), alloc(alloc_continue_statement)); define_new_ast_id!(SynchronousStatementId, StatementId, index(SynchronousStatement, Statement::Synchronous, statements), alloc(alloc_synchronous_statement)); define_new_ast_id!(EndSynchronousStatementId, StatementId, index(EndSynchronousStatement, Statement::EndSynchronous, statements), alloc(alloc_end_synchronous_statement)); +define_new_ast_id!(ForkStatementId, StatementId, index(ForkStatement, Statement::Fork, statements), alloc(alloc_fork_statement)); +define_new_ast_id!(EndForkStatementId, StatementId, index(EndForkStatement, Statement::EndFork, statements), alloc(alloc_end_fork_statement)); define_new_ast_id!(ReturnStatementId, StatementId, index(ReturnStatement, Statement::Return, statements), alloc(alloc_return_statement)); define_new_ast_id!(GotoStatementId, StatementId, index(GotoStatement, Statement::Goto, statements), alloc(alloc_goto_statement)); define_new_ast_id!(NewStatementId, StatementId, index(NewStatement, Statement::New, statements), alloc(alloc_new_statement)); @@ -1035,6 +1037,8 @@ pub enum Statement { Continue(ContinueStatement), Synchronous(SynchronousStatement), EndSynchronous(EndSynchronousStatement), + Fork(ForkStatement), + EndFork(EndForkStatement), Return(ReturnStatement), Goto(GotoStatement), New(NewStatement), @@ -1078,11 +1082,12 @@ impl Statement { Statement::Break(v) => v.span, Statement::Continue(v) => v.span, Statement::Synchronous(v) => v.span, + Statement::Fork(v) => v.span, Statement::Return(v) => v.span, Statement::Goto(v) => v.span, Statement::New(v) => v.span, Statement::Expression(v) => v.span, - Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) => unreachable!(), + Statement::EndBlock(_) | Statement::EndIf(_) | Statement::EndWhile(_) | Statement::EndSynchronous(_) | Statement::EndFork(_) => unreachable!(), } } pub fn link_next(&mut self, next: StatementId) { @@ -1096,12 +1101,14 @@ impl Statement { Statement::EndIf(stmt) => stmt.next = next, Statement::EndWhile(stmt) => stmt.next = next, Statement::EndSynchronous(stmt) => stmt.next = next, + Statement::EndFork(stmt) => stmt.next = next, Statement::New(stmt) => stmt.next = next, Statement::Expression(stmt) => stmt.next = next, Statement::Return(_) | Statement::Break(_) | Statement::Continue(_) | Statement::Synchronous(_) + | Statement::Fork(_) | Statement::Goto(_) | Statement::While(_) | Statement::Labeled(_) @@ -1271,7 +1278,6 @@ pub struct SynchronousStatement { // Phase 1: parser pub span: InputSpan, // of the "sync" keyword pub body: BlockStatementId, - // Phase 2: linker pub end_sync: EndSynchronousStatementId, } @@ -1283,6 +1289,23 @@ pub struct EndSynchronousStatement { pub next: StatementId, } +#[derive(Debug, Clone)] +pub struct ForkStatement { + pub this: ForkStatementId, + // Phase 1: parser + pub span: InputSpan, // of the "fork" keyword + pub left_body: BlockStatementId, + pub right_body: Option, + pub end_fork: EndForkStatementId, +} + +#[derive(Debug, Clone)] +pub struct EndForkStatement { + pub this: EndForkStatementId, + pub start_fork: ForkStatementId, + pub next: StatementId, +} + #[derive(Debug, Clone)] pub struct ReturnStatement { pub this: ReturnStatementId, diff --git a/src/protocol/ast_printer.rs b/src/protocol/ast_printer.rs index 4e566cdc23d634459b11530513dee8b50d57f704..6aff2810101c2a5143012ad83ec80eaa21cd852d 100644 --- a/src/protocol/ast_printer.rs +++ b/src/protocol/ast_printer.rs @@ -36,6 +36,8 @@ const PREFIX_BREAK_STMT_ID: &'static str = "SBre"; const PREFIX_CONTINUE_STMT_ID: &'static str = "SCon"; const PREFIX_SYNC_STMT_ID: &'static str = "SSyn"; const PREFIX_ENDSYNC_STMT_ID: &'static str = "SESy"; +const PREFIX_FORK_STMT_ID: &'static str = "SFrk"; +const PREFIX_END_FORK_STMT_ID: &'static str = "SEFk"; const PREFIX_RETURN_STMT_ID: &'static str = "SRet"; const PREFIX_ASSERT_STMT_ID: &'static str = "SAsr"; const PREFIX_GOTO_STMT_ID: &'static str = "SGot"; @@ -511,6 +513,24 @@ impl ASTWriter { self.kv(indent2).with_s_key("StartSync").with_disp_val(&stmt.start_sync.0.index); self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index); }, + Statement::Fork(stmt) => { + self.kv(indent).with_id(PREFIX_FORK_STMT_ID, stmt.this.0.index) + .with_s_key("Fork"); + self.kv(indent2).with_s_key("EndFork").with_disp_val(&stmt.end_fork.0.index); + self.kv(indent2).with_s_key("LeftBody"); + self.write_stmt(heap, stmt.left_body.upcast(), indent3); + + if let Some(right_body_id) = stmt.right_body { + self.kv(indent2).with_s_key("RightBody"); + self.write_stmt(heap, right_body_id.upcast(), indent3); + } + }, + Statement::EndFork(stmt) => { + self.kv(indent).with_id(PREFIX_END_FORK_STMT_ID, stmt.this.0.index) + .with_s_key("EndFork"); + self.kv(indent2).with_s_key("StartFork").with_disp_val(&stmt.start_fork.0.index); + self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index); + } Statement::Return(stmt) => { self.kv(indent).with_id(PREFIX_RETURN_STMT_ID, stmt.this.0.index) .with_s_key("Return"); diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 68dc3eec80a2ebb90159a4d62401f7eb6c5a727d..3b16522574521e6ec3bdb70b57ee34b22073a949 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -200,6 +200,7 @@ pub enum EvalContinuation { SyncBlockEnd, NewComponent(DefinitionId, i32, ValueGroup), NewChannel, + NewFork, BlockFires(PortId), BlockGet(PortId), Put(PortId, Value), @@ -584,7 +585,7 @@ impl Prompt { unreachable!("executor calling 'get' on value {:?}", value) }; - match ctx.get(port_id) { + match ctx.performed_get(port_id) { Some(result) => { // We have the result. Merge the `ValueGroup` with the // stack/heap storage. @@ -613,11 +614,12 @@ impl Prompt { let msg_value = cur_frame.expr_values.pop_front().unwrap(); let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone(); - if ctx.did_put(port_id) { + if ctx.performed_put(port_id) { // We're fine, deallocate in case the expression value stack // held an owned value self.store.drop_value(msg_value.get_heap_pos()); } else { + // Prepare to execute again cur_frame.expr_values.push_front(msg_value); cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); @@ -810,7 +812,7 @@ impl Prompt { LocalStatement::Channel(stmt) => { // Need to create a new channel by requesting it from // the runtime. - match ctx.get_channel() { + match ctx.created_channel() { None => { // No channel is pending. So request one Ok(EvalContinuation::NewChannel) @@ -886,6 +888,33 @@ impl Prompt { Ok(EvalContinuation::SyncBlockEnd) }, + Statement::Fork(stmt) => { + if stmt.right_body.is_none() { + // No reason to fork + cur_frame.position = stmt.left_body.upcast(); + } else { + // Need to fork + if let Some(go_left) = ctx.performed_fork() { + // Runtime has created a fork + if go_left { + cur_frame.position = stmt.left_body.upcast(); + } else { + cur_frame.position = stmt.right_body.unwrap().upcast(); + } + } else { + // Request the runtime to create a fork of the current + // branch + return Ok(EvalContinuation::NewFork); + } + } + + Ok(EvalContinuation::Stepping) + }, + Statement::EndFork(stmt) => { + cur_frame.position = stmt.next; + + Ok(EvalContinuation::Stepping) + } Statement::Return(_stmt) => { debug_assert!(heap[cur_frame.definition].is_function()); debug_assert_eq!(cur_frame.expr_values.len(), 1, "expected one expr value for return statement"); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 62481340e1b64cfaf42180b4dc1041409f4d733b..fade25ccafb1caa91db843d4930698c93c605a4c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -52,6 +52,7 @@ pub enum ComponentCreationError { InvalidNumArguments, InvalidArgumentType(usize), UnownedPort, + InSync, } impl std::fmt::Debug for ProtocolDescription { @@ -283,10 +284,11 @@ impl ProtocolDescription { // TODO: @temp Should just become a concrete thing that is passed in pub trait RunContext { - fn did_put(&mut self, port: PortId) -> bool; - fn get(&mut self, port: PortId) -> Option; // None if still waiting on message + fn performed_put(&mut self, port: PortId) -> bool; + fn performed_get(&mut self, port: PortId) -> Option; // None if still waiting on message fn fires(&mut self, port: PortId) -> Option; // None if not yet branched - fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared + fn performed_fork(&mut self) -> Option; // None if not yet forked + fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared } #[derive(Debug)] @@ -299,8 +301,9 @@ pub enum RunResult { // Can only occur inside sync blocks BranchInconsistent, // branch has inconsistent behaviour BranchMissingPortState(PortId), // branch doesn't know about port firing - BranchMissingPortValue(PortId), // branch hasn't received message on input port yet + BranchGet(PortId), // branch hasn't received message on input port yet BranchAtSyncEnd, + BranchFork, BranchPut(PortId, ValueGroup), } @@ -328,8 +331,10 @@ impl ComponentState { return RR::NewComponent(definition_id, monomorph_idx, args), EC::NewChannel => return RR::NewChannel, + EC::NewFork => + return RR::BranchFork, EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id), - EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id), + EC::BlockGet(port_id) => return RR::BranchGet(port_id), EC::Put(port_id, value) => { let value_group = ValueGroup::from_store(&self.prompt.store, &[value]); return RR::BranchPut(port_id, value_group); @@ -397,6 +402,7 @@ impl ComponentState { // to the runtime unreachable!(); }, + EvalContinuation::NewFork => unreachable!(), // Outside synchronous blocks, no fires/get/put happens EvalContinuation::BlockFires(_) => unreachable!(), EvalContinuation::BlockGet(_) => unreachable!(), @@ -430,6 +436,7 @@ impl ComponentState { // Not possible to create component in sync block EvalContinuation::NewComponent(_, _, _) => unreachable!(), EvalContinuation::NewChannel => unreachable!(), + EvalContinuation::NewFork => unreachable!(), EvalContinuation::BlockFires(port) => { return SyncBlocker::CouldntCheckFiring(port); }, @@ -462,7 +469,7 @@ impl ComponentState { } impl RunContext for EvalContext<'_> { - fn did_put(&mut self, port: PortId) -> bool { + fn performed_put(&mut self, port: PortId) -> bool { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), @@ -472,7 +479,7 @@ impl RunContext for EvalContext<'_> { } } - fn get(&mut self, port: PortId) -> Option { + fn performed_get(&mut self, port: PortId) -> Option { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), @@ -511,7 +518,7 @@ impl RunContext for EvalContext<'_> { } } - fn get_channel(&mut self) -> Option<(Value, Value)> { + fn created_channel(&mut self) -> Option<(Value, Value)> { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(context) => { @@ -523,6 +530,11 @@ impl RunContext for EvalContext<'_> { EvalContext::Sync(_) => unreachable!(), } } + + fn performed_fork(&mut self) -> Option { + // Never actually used in the old runtime + return None; + } } // TODO: @remove once old runtime has disappeared diff --git a/src/protocol/parser/pass_definitions.rs b/src/protocol/parser/pass_definitions.rs index 2f901cfd514e709e41823a10195bfae922b33871..a96d0f2df6f0cca3f62ea4f1017449ab885b0f2f 100644 --- a/src/protocol/parser/pass_definitions.rs +++ b/src/protocol/parser/pass_definitions.rs @@ -423,13 +423,28 @@ impl PassDefinitions { let id = self.consume_synchronous_statement(module, iter, ctx)?; section.push(id.upcast()); - let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement{ - this, start_sync: id, next: StatementId::new_invalid() + let end_sync = ctx.heap.alloc_end_synchronous_statement(|this| EndSynchronousStatement { + this, + start_sync: id, + next: StatementId::new_invalid() }); section.push(end_sync.upcast()); let sync_stmt = &mut ctx.heap[id]; sync_stmt.end_sync = end_sync; + } else if ident == KW_STMT_FORK { + let id = self.consume_fork_statement(module, iter, ctx)?; + section.push(id.upcast()); + + let end_fork = ctx.heap.alloc_end_fork_statement(|this| EndForkStatement{ + this, + start_fork: id, + next: StatementId::new_invalid(), + }); + section.push(end_fork.upcast()); + + let fork_stmt = &mut ctx.heap[id]; + fork_stmt.end_fork = end_fork; } else if ident == KW_STMT_RETURN { let id = self.consume_return_statement(module, iter, ctx)?; section.push(id.upcast()); @@ -613,6 +628,29 @@ impl PassDefinitions { })) } + fn consume_fork_statement( + &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx + ) -> Result { + let fork_span = consume_exact_ident(&module.source, iter, KW_STMT_FORK)?; + let left_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?; + + let right_body = if has_ident(&module.source, iter, KW_STMT_OR) { + iter.consume(); + let right_body = self.consume_block_or_wrapped_statement(module, iter, ctx)?; + Some(right_body) + } else { + None + }; + + Ok(ctx.heap.alloc_fork_statement(|this| ForkStatement{ + this, + span: fork_span, + left_body, + right_body, + end_fork: EndForkStatementId::new_invalid(), + })) + } + fn consume_return_statement( &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx ) -> Result { diff --git a/src/protocol/parser/pass_typing.rs b/src/protocol/parser/pass_typing.rs index 0b858460b6a7c1ad2e9bf19867c760b3a56c63fd..57d1fa8f05ffe09da37549dae45a3c87dbe2e662 100644 --- a/src/protocol/parser/pass_typing.rs +++ b/src/protocol/parser/pass_typing.rs @@ -1142,6 +1142,19 @@ impl Visitor for PassTyping { self.visit_block_stmt(ctx, body_id) } + fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult { + let fork_stmt = &ctx.heap[id]; + let left_body_id = fork_stmt.left_body; + let right_body_id = fork_stmt.right_body; + + self.visit_block_stmt(ctx, left_body_id)?; + if let Some(right_body_id) = right_body_id { + self.visit_block_stmt(ctx, right_body_id)?; + } + + Ok(()) + } + fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult { let return_stmt = &ctx.heap[id]; debug_assert_eq!(return_stmt.expressions.len(), 1); diff --git a/src/protocol/parser/pass_validation_linking.rs b/src/protocol/parser/pass_validation_linking.rs index f5bcae1bef2743e65581b156e5f61ddf92a70433..f7362e0996add4819ae92058adb09dff264b9f64 100644 --- a/src/protocol/parser/pass_validation_linking.rs +++ b/src/protocol/parser/pass_validation_linking.rs @@ -1,3 +1,40 @@ +/* + * pass_validation_linking.rs + * + * The pass that will validate properties of the AST statements (one is not + * allowed to nest synchronous statements, instantiating components occurs in + * the right places, etc.) and expressions (assignments may not occur in + * arbitrary expressions). + * + * Furthermore, this pass will also perform "linking", in the sense of: some AST + * nodes have something to do with one another, so we link them up in this pass + * (e.g. setting the parents of expressions, linking the control flow statements + * like `continue` and `break` up to the respective loop statement, etc.). + * + * There are several "confusing" parts about this pass: + * + * Setting expression parents: this is the simplest one. The pass struct acts + * like a little state machine. When visiting an expression it will set the + * "parent expression" field of the pass to itself, then visit its child. The + * child will look at this "parent expression" field to determine its parent. + * + * Setting the `next` statement: the AST is a tree, but during execution we walk + * a linear path through all statements. So where appropriate a statement may + * set the "previous statement" field of the pass to itself. When visiting the + * subsequent statement it will check this "previous statement", and if set, it + * will link this previous statement up to itself. Not every statement has a + * previous statement. Hence there are two patterns that occur: assigning the + * `next` value, then clearing the "previous statement" field. And assigning the + * `next` value, and then putting the current statement's ID in the "previous + * statement" field. Because it is so common, this file contain two macros that + * perform that operation. + * + * To make storing types for polymorphic procedures simpler and more efficient, + * we assign to each expression in the procedure a unique ID. This is what the + * "next expression index" field achieves. Each expression simply takes the + * current value, and then increments this counter. + */ + use crate::collections::{ScopedBuffer}; use crate::protocol::ast::*; use crate::protocol::input_source::*; @@ -255,7 +292,8 @@ impl Visitor for PassValidationLinking { self.expr_parent = ExpressionParent::None; // Visit true and false branch. Executor chooses next statement based on - // test expression, not on if-statement itself. + // test expression, not on if-statement itself. Hence the if statement + // does not have a static subsequent statement. assign_then_erase_next_stmt!(self, ctx, id.upcast()); self.visit_block_stmt(ctx, true_stmt_id)?; assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast()); @@ -370,6 +408,36 @@ impl Visitor for PassValidationLinking { Ok(()) } + fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult { + let fork_stmt = &ctx.heap[id]; + let end_fork_id = fork_stmt.end_fork; + let left_body_id = fork_stmt.left_body; + let right_body_id = fork_stmt.right_body; + + // Fork statements may only occur inside sync blocks + if self.in_sync.is_invalid() { + return Err(ParseError::new_error_str_at_span( + &ctx.module().source, fork_stmt.span, + "Forking may only occur inside sync blocks" + )); + } + + // Visit the respective bodies. Like the if statement, a fork statement + // does not have a single static subsequent statement. It forks and then + // each fork has a different next statement. + assign_then_erase_next_stmt!(self, ctx, id.upcast()); + self.visit_block_stmt(ctx, left_body_id)?; + assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast()); + + if let Some(right_body_id) = right_body_id { + self.visit_block_stmt(ctx, right_body_id)?; + assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast()); + } + + self.prev_stmt = end_fork_id.upcast(); + Ok(()) + } + fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult { // Check if "return" occurs within a function let stmt = &ctx.heap[id]; diff --git a/src/protocol/parser/token_parsing.rs b/src/protocol/parser/token_parsing.rs index c1fe33866a320f12cb0a6aab04db481579eddae1..f9cc693182f025fbfb9491ebd2fc4fc60a59ebca 100644 --- a/src/protocol/parser/token_parsing.rs +++ b/src/protocol/parser/token_parsing.rs @@ -45,7 +45,9 @@ pub(crate) const KW_STMT_BREAK: &'static [u8] = b"break"; pub(crate) const KW_STMT_CONTINUE: &'static [u8] = b"continue"; pub(crate) const KW_STMT_GOTO: &'static [u8] = b"goto"; pub(crate) const KW_STMT_RETURN: &'static [u8] = b"return"; -pub(crate) const KW_STMT_SYNC: &'static [u8] = b"synchronous"; +pub(crate) const KW_STMT_SYNC: &'static [u8] = b"sync"; +pub(crate) const KW_STMT_FORK: &'static [u8] = b"fork"; +pub(crate) const KW_STMT_OR: &'static [u8] = b"or"; pub(crate) const KW_STMT_NEW: &'static [u8] = b"new"; // Keywords - types @@ -527,7 +529,7 @@ fn is_reserved_statement_keyword(text: &[u8]) -> bool { KW_IMPORT | KW_AS | KW_STMT_CHANNEL | KW_STMT_IF | KW_STMT_WHILE | KW_STMT_BREAK | KW_STMT_CONTINUE | KW_STMT_GOTO | KW_STMT_RETURN | - KW_STMT_SYNC | KW_STMT_NEW => true, + KW_STMT_SYNC | KW_STMT_FORK | KW_STMT_NEW => true, _ => false, } } diff --git a/src/protocol/parser/visitor.rs b/src/protocol/parser/visitor.rs index c30b5129c9d77c333103e4eb3f96768645067890..0de39881d984f3aee776587344ba09e114f8f21c 100644 --- a/src/protocol/parser/visitor.rs +++ b/src/protocol/parser/visitor.rs @@ -133,6 +133,11 @@ pub(crate) trait Visitor { self.visit_synchronous_stmt(ctx, this) }, Statement::EndSynchronous(_stmt) => Ok(()), + Statement::Fork(stmt) => { + let this = stmt.this; + self.visit_fork_stmt(ctx, this) + }, + Statement::EndFork(_stmt) => Ok(()), Statement::Return(stmt) => { let this = stmt.this; self.visit_return_stmt(ctx, this) @@ -175,6 +180,7 @@ pub(crate) trait Visitor { fn visit_break_stmt(&mut self, _ctx: &mut Ctx, _id: BreakStatementId) -> VisitorResult { Ok(()) } fn visit_continue_stmt(&mut self, _ctx: &mut Ctx, _id: ContinueStatementId) -> VisitorResult { Ok(()) } fn visit_synchronous_stmt(&mut self, _ctx: &mut Ctx, _id: SynchronousStatementId) -> VisitorResult { Ok(()) } + fn visit_fork_stmt(&mut self, _ctx: &mut Ctx, _id: ForkStatementId) -> VisitorResult { Ok(()) } fn visit_return_stmt(&mut self, _ctx: &mut Ctx, _id: ReturnStatementId) -> VisitorResult { Ok(()) } fn visit_goto_stmt(&mut self, _ctx: &mut Ctx, _id: GotoStatementId) -> VisitorResult { Ok(()) } fn visit_new_stmt(&mut self, _ctx: &mut Ctx, _id: NewStatementId) -> VisitorResult { Ok(()) } diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 17a6aac7bc715623c08fbdf995688b6dfc39c7e2..b9cf20cd3d9a847e5d489335d4b310605476d26c 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -37,9 +37,9 @@ fn file_logged_configured_connector( Connector::new(file_logger, pd, connector_id) } static MINIMAL_PDL: &'static [u8] = b" -primitive sync(in a, out b) { +primitive sync_component(in a, out b) { while (true) { - synchronous { + sync { if (fires(a) && fires(b)) { msg x = get(a); put(b, x); @@ -51,7 +51,7 @@ primitive sync(in a, out b) { } primitive together(in ia, in ib, out oa, out ob){ - while(true) synchronous { + while(true) sync { if(fires(ia)) { put(oa, get(ia)); put(ob, get(ib)); @@ -102,7 +102,7 @@ fn new_sync() { let test_log_path = Path::new("./logs/new_sync"); let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); - c.add_component(b"", b"sync", &[i, o]).unwrap(); + c.add_component(b"", b"sync_component", &[i, o]).unwrap(); } #[test] @@ -353,7 +353,7 @@ fn cannot_use_moved_ports() { let test_log_path = Path::new("./logs/cannot_use_moved_ports"); let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); - c.add_component(b"", b"sync", &[g, p]).unwrap(); + c.add_component(b"", b"sync_component", &[g, p]).unwrap(); c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap_err(); c.get(g).unwrap_err(); @@ -369,7 +369,7 @@ fn sync_sync() { let mut c = file_logged_connector(0, test_log_path); let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"sync", &[g0, p1]).unwrap(); + c.add_component(b"", b"sync_component", &[g0, p1]).unwrap(); c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(g1).unwrap(); @@ -421,7 +421,7 @@ fn distributed_msg_bounce() { c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; - c.add_component(b"", b"sync", &[g, p]).unwrap(); + c.add_component(b"", b"sync_component", &[g, p]).unwrap(); c.connect(SEC1).unwrap(); c.sync(SEC1).unwrap(); }); @@ -882,7 +882,7 @@ fn ac_not_b() { let pdl = b" primitive ac_not_b(in a, in b, out c){ // forward A to C but keep B silent - synchronous{ put(c, get(a)); } + sync { put(c, get(a)); } }"; let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap()); let mut c = file_logged_configured_connector(1, test_log_path, pd); @@ -946,7 +946,7 @@ fn many_rounds_mem() { fn pdl_reo_lossy() { let pdl = b" primitive lossy(in a, out b) { - while(true) synchronous { + while(true) sync { msg m = null; if(fires(a)) { m = get(a); @@ -965,7 +965,7 @@ fn pdl_reo_fifo1() { let pdl = b" primitive fifo1(in a, out b) { msg m = null; - while(true) synchronous { + while(true) sync { if(m == null) { if(fires(a)) m=get(a); } else { @@ -985,7 +985,7 @@ fn pdl_reo_fifo1full() { primitive fifo1full(in a, out b) { bool is_set = true; msg m = create(0); - while(true) synchronous { + while(true) sync { if(!is_set) { if(fires(a)) m=get(a); is_set = false; @@ -1012,7 +1012,7 @@ fn pdl_msg_consensus() { let test_log_path = Path::new("./logs/pdl_msg_consensus"); let pdl = b" primitive msgconsensus(in a, in b) { - while(true) synchronous { + while(true) sync { msg x = get(a); msg y = get(b); assert(x == y); @@ -1040,7 +1040,7 @@ fn sequencer3_prim() { let pdl = b" primitive sequencer3(out a, out b, out c) { u32 i = 0; - while(true) synchronous { + while(true) sync { out to = a; if (i==1) to = b; else if(i==2) to = c; @@ -1087,7 +1087,7 @@ fn sequencer3_comp() { let pdl = b" primitive replicator(in a, out b, out c) { while (true) { - synchronous { + sync { if (fires(a) && fires(b) && fires(c)) { msg x = get(a); put(b, x); @@ -1099,7 +1099,7 @@ fn sequencer3_comp() { } } primitive fifo1_init(bool has_value, T m, in a, out b) { - while(true) synchronous { + while(true) sync { if(has_value && fires(b)) { put(b, m); has_value = false; @@ -1181,7 +1181,7 @@ fn xrouter_prim() { let test_log_path = Path::new("./logs/xrouter_prim"); let pdl = b" primitive xrouter(in a, out b, out c) { - while(true) synchronous { + while(true) sync { if(fires(a)) { if(fires(b)) put(b, get(a)); else put(c, get(a)); @@ -1222,7 +1222,7 @@ fn xrouter_comp() { let pdl = b" primitive replicator(in a, out b, out c) { while (true) { - synchronous { + sync { if (fires(a) && fires(b) && fires(c)) { msg x = get(a); put(b, x); @@ -1236,7 +1236,7 @@ fn xrouter_comp() { primitive merger(in a, in b, out c) { while (true) { - synchronous { + sync { if (fires(a) && !fires(b) && fires(c)) { put(c, get(a)); } else if (!fires(a) && fires(b) && fires(c)) { @@ -1249,7 +1249,7 @@ fn xrouter_comp() { } primitive lossy(in a, out b) { - while(true) synchronous { + while(true) sync { if(fires(a)) { auto m = get(a); if(fires(b)) put(b, m); @@ -1257,7 +1257,7 @@ fn xrouter_comp() { } } primitive sync_drain(in a, in b) { - while(true) synchronous { + while(true) sync { if(fires(a)) { msg drop_it = get(a); msg on_the_floor = get(b); @@ -1320,7 +1320,7 @@ fn count_stream() { primitive count_stream(out o) { msg m = create(1); m[0] = 0; - while(true) synchronous { + while(true) sync { put(o, m); m[0] += 1; } @@ -1351,7 +1351,7 @@ fn for_msg_byte() { while(i<8) { msg m = create(1); m[idx] = i; - synchronous put(o, m); + sync put(o, m); i += 1; } } @@ -1379,7 +1379,7 @@ fn eq_causality() { primitive eq(in a, in b, out c) { msg ma = create(0); msg mb = create(0); - while(true) synchronous { + while(true) sync { if(fires(a)) { // b and c also fire! // left first! @@ -1435,7 +1435,7 @@ fn eq_no_causality() { primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) { msg ma = create(0); msg mb = create(0); - while(true) synchronous { + while(true) sync { if(fires(a)) { // b and c also fire! if(fires(leftfirsti)) { diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 1b296960bcfabfc69b0176b84cd9b13b012e6bd3..511dfc1597390034c4913bd4a68a0b566fd6df2e 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -6,6 +6,13 @@ use crate::protocol::eval::{Value, ValueGroup}; use super::port::PortIdLocal; +// To share some logic between the FakeTree and ExecTree implementation +trait BranchListItem { + fn get_id(&self) -> BranchId; + fn set_next_id(&mut self, id: BranchId); + fn get_next_id(&self) -> BranchId; +} + /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context /// we use this fact to let branch ID 0 denote the ID being invalid. @@ -45,6 +52,35 @@ pub(crate) enum SpeculativeState { Inconsistent, // branch can never represent a local solution, so halted } +#[derive(Debug)] +pub(crate) enum PreparedStatement { + CreatedChannel((Value, Value)), + ForkedExecution(bool), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl PreparedStatement { + pub(crate) fn is_none(&self) -> bool { + if let PreparedStatement::None = self { + return true; + } else { + return false; + } + } + + pub(crate) fn take(&mut self) -> PreparedStatement { + if let PreparedStatement::None = self { + return PreparedStatement::None; + } else { + let mut replacement = PreparedStatement::None; + std::mem::swap(self, &mut replacement); + return replacement; + } + } +} + /// The execution state of a branch. This envelops the PDL code and the /// execution state. And derived from that: if we're ready to keep running the /// code, or if we're halted for some reason (e.g. waiting for a message). @@ -56,8 +92,13 @@ pub(crate) struct Branch { pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` - pub inbox: HashMap, // TODO: Remove, currently only valid in single-get/put mode - pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? + pub prepared: PreparedStatement, +} + +impl BranchListItem for Branch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } } impl Branch { @@ -70,19 +111,18 @@ impl Branch { sync_state: SpeculativeState::RunningNonSync, awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), - inbox: HashMap::new(), - prepared_channel: None, + prepared: PreparedStatement::None, } } /// Constructs a sync branch. The provided branch is assumed to be the /// parent of the new branch within the execution tree. fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { - debug_assert!( - (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || - (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) - ); // forking from non-sync, or forking from a branching point - debug_assert!(parent_branch.prepared_channel.is_none()); + // debug_assert!( + // (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || + // (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) + // ); // forking from non-sync, or forking from a branching point + debug_assert!(parent_branch.prepared.is_none()); Branch { id: BranchId::new(new_index), @@ -91,19 +131,9 @@ impl Branch { sync_state: SpeculativeState::RunningInSync, awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), - inbox: parent_branch.inbox.clone(), - prepared_channel: None, + prepared: PreparedStatement::None, } } - - /// Inserts a message into the branch for retrieval by a corresponding - /// `get(port)` call. - pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { - debug_assert!(target_port.is_valid()); - debug_assert!(self.awaiting_port == target_port); - self.awaiting_port = PortIdLocal::new_invalid(); - self.inbox.insert(target_port, contents); - } } /// Queue of branches. Just a little helper. @@ -131,7 +161,7 @@ impl BranchQueue { const NUM_QUEUES: usize = 3; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum QueueKind { Runnable, AwaitingMessage, @@ -148,6 +178,10 @@ impl QueueKind { } } +// ----------------------------------------------------------------------------- +// ExecTree +// ----------------------------------------------------------------------------- + /// Execution tree of branches. Tries to keep the extra information stored /// herein to a minimum. So the execution tree is aware of the branches, their /// execution state and the way they're dependent on each other, but the @@ -188,32 +222,12 @@ impl ExecTree { /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - return None; - } else { - let first_branch = &mut self.branches[queue.first.index as usize]; - queue.first = first_branch.next_in_queue; - first_branch.next_in_queue = BranchId::new_invalid(); - if !queue.first.is_valid() { - queue.last = BranchId::new_invalid(); - } - - return Some(first_branch.id); - } + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); } /// Pushes a branch (ID) into a queue. pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - queue.first = id; - queue.last = id; - } else { - let last_branch = &mut self.branches[queue.last.index as usize]; - last_branch.next_in_queue = id; - queue.last = id; - } + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } /// Returns the non-sync branch (TODO: better name?) @@ -222,27 +236,24 @@ impl ExecTree { return &mut self.branches[0]; } - /// Returns an iterator over all the elements in the queue of the given - /// kind. One can start the iteration at the branch *after* the provided - /// branch. Just make sure it actually is in the provided queue. - pub fn iter_queue(&self, kind: QueueKind, start_at: Option) -> BranchQueueIter { + /// Returns the branch ID of the first branch in a particular queue. + pub fn get_queue_first(&self, kind: QueueKind) -> Option { let queue = &self.queues[kind.as_index()]; + if queue.first.is_valid() { + return Some(queue.first); + } else { + return None; + } + } - let index = match start_at { - Some(branch_id) => { - debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id)); - let branch = &self.branches[branch_id.index as usize]; - - branch.next_in_queue.index as usize - }, - None => { - queue.first.index as usize - } - }; - - return BranchQueueIter { - branches: self.branches.as_slice(), - index, + /// Returns the next branch ID of a branch (assumed to be in a particular + /// queue. + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; } } @@ -285,7 +296,6 @@ impl ExecTree { /// using the provided branch as the final sync result. pub fn end_sync(&mut self, branch_id: BranchId) { debug_assert!(self.is_in_sync()); - debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id)); // Swap indicated branch into the first position self.branches.swap(0, branch_id.index as usize); @@ -298,8 +308,7 @@ impl ExecTree { branch.sync_state = SpeculativeState::RunningNonSync; debug_assert!(!branch.awaiting_port.is_valid()); branch.next_in_queue = BranchId::new_invalid(); - branch.inbox.clear(); - debug_assert!(branch.prepared_channel.is_none()); + debug_assert!(branch.prepared.is_none()); // Clear out all the queues for queue_idx in 0..NUM_QUEUES { @@ -324,41 +333,224 @@ impl IndexMut for ExecTree { } } -pub(crate) struct BranchQueueIter<'a> { +/// Iterator over the parents of an `ExecTree` branch. +pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], index: usize, } -impl<'a> Iterator for BranchQueueIter<'a> { +impl<'a> Iterator for BranchParentIter<'a> { type Item = &'a Branch; fn next(&mut self) -> Option { if self.index == 0 { - // i.e. the invalid branch index return None; } let branch = &self.branches[self.index]; - self.index = branch.next_in_queue.index as usize; + self.index = branch.parent_id.index as usize; return Some(branch); } } -pub(crate) struct BranchParentIter<'a> { - branches: &'a [Branch], - index: usize, +// ----------------------------------------------------------------------------- +// FakeTree +// ----------------------------------------------------------------------------- + +/// Generic fake branch. This is supposed to be used in conjunction with the +/// fake tree. The purpose is to have a branching-like tree to use in +/// combination with a consensus algorithm in places where we don't have PDL +/// code. +pub(crate) struct FakeBranch { + pub id: BranchId, + pub parent_id: BranchId, + pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, + pub next_in_queue: BranchId, + pub inbox: HashMap, } -impl<'a> Iterator for BranchParentIter<'a> { - type Item = &'a Branch; +impl BranchListItem for FakeBranch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} - fn next(&mut self) -> Option { - if self.index == 0 { +impl FakeBranch { + fn new_root(_index: u32) -> FakeBranch { + debug_assert!(_index == 1); + return FakeBranch{ + id: BranchId::new(1), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningInSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + } + } + + fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch { + return FakeBranch { + id: BranchId::new(index), + parent_id: parent_branch.id, + sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, + next_in_queue: BranchId::new_invalid(), + inbox: parent_branch.inbox.clone(), + } + } + + pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { + debug_assert!(target_port.is_valid()); + debug_assert!(self.awaiting_port == target_port); + self.awaiting_port = PortIdLocal::new_invalid(); + self.inbox.insert(target_port, contents); + } +} + +/// A little helper for native components that don't have a set of branches that +/// are actually executing code, but just have to manage the idea of branches +/// due to them performing the equivalent of a branching `get` call. +pub(crate) struct FakeTree { + pub branches: Vec, + queues: [BranchQueue; NUM_QUEUES], +} + +impl FakeTree { + pub fn new() -> Self { + // TODO: Don't like this? Cause is that now we don't have a non-sync + // branch. But we assumed BranchId=0 means the branch is invalid. We + // can do the rusty Option stuff. But we still need a token + // value within the protocol to signify no-branch-id. Maybe the high + // bit? Branches are crazy expensive, no-one is going to have 2^32 + // branches anyway. 2^31 isn't too bad. + return Self { + branches: vec![FakeBranch{ + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningNonSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + }], + queues: [BranchQueue::new(); 3] + } + } + + fn is_in_sync(&self) -> bool { + return self.branches.len() > 1; + } + + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); + } + + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); + } + + pub fn get_queue_first(&self, kind: QueueKind) -> Option { + let queue = &self.queues[kind.as_index()]; + if queue.first.is_valid() { + return Some(queue.first) + } else { return None; } + } - let branch = &self.branches[self.index]; - self.index = branch.parent_id.index as usize; - return Some(branch); + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; + } + } + + pub fn start_sync(&mut self) -> BranchId { + debug_assert!(!self.is_in_sync()); + + // Create the first branch + let sync_branch = FakeBranch::new_root(1); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + + return sync_branch_id; + } + + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch); + let new_branch_id = new_branch.id; + self.branches.push(new_branch); + + return new_branch_id; + } + + pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { + debug_assert!(branch_id.is_valid()); + debug_assert!(self.is_in_sync()); + + // Take out the succeeding branch, then just clear all fake branches. + self.branches.swap(1, branch_id.index as usize); + self.branches.truncate(2); + let result = self.branches.pop().unwrap(); + + for queue_index in 0..NUM_QUEUES { + self.queues[queue_index] = BranchQueue::new(); + } + + return result; + } +} + +impl Index for FakeTree { + type Output = FakeBranch; + + fn index(&self, index: BranchId) -> &Self::Output { + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for FakeTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + return &mut self.branches[index.index as usize]; + } +} + +// ----------------------------------------------------------------------------- +// Shared logic +// ----------------------------------------------------------------------------- + +fn pop_from_queue(queue: &mut BranchQueue, branches: &mut [B]) -> Option { + if queue.is_empty() { + return None; + } else { + let first_branch = &mut branches[queue.first.index as usize]; + queue.first = first_branch.get_next_id(); + first_branch.set_next_id(BranchId::new_invalid()); + if !queue.first.is_valid() { + queue.last = BranchId::new_invalid(); + } + + return Some(first_branch.get_id()); + } +} + +fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) { + debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid()); + if queue.is_empty() { + queue.first = branch_id; + queue.last = branch_id; + } else { + let last_branch = &mut branches[queue.last.index as usize]; + last_branch.set_next_id(branch_id); + queue.last = branch_id; } } \ No newline at end of file diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 943e149661563948bd38169e4e07b8c0ddd74a11..a198f917218ce02aafdf8cd1a2e97c1115df25ca 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -33,6 +33,7 @@ use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::PreparedStatement; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; @@ -55,7 +56,7 @@ impl ConnectorPublic { } } -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time @@ -66,29 +67,31 @@ pub(crate) enum ConnectorScheduling { pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, + last_finished_handled: Option, } +// TODO: Remove remaining fields once 'fires()' is removed from language. struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, + prepared: PreparedStatement, } impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); + fn performed_put(&mut self, _port: PortId) -> bool { + return match self.prepared.take() { + PreparedStatement::None => false, + PreparedStatement::PerformedPut => true, + taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) + }; } - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } + fn performed_get(&mut self, _port: PortId) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::PerformedGet(value) => Some(value), + taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), + }; } fn fires(&mut self, port: PortId) -> Option { @@ -97,8 +100,20 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return annotation.expected_firing.map(|v| Value::Bool(v)); } - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); + fn created_channel(&mut self) -> Option<(Value, Value)> { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::CreatedChannel(ports) => Some(ports), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + }; + } + + fn performed_fork(&mut self) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::ForkedExecution(path) => Some(path), + taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), + }; } } @@ -106,13 +121,26 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { + // Run in sync mode let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } + + self.last_finished_handled = Some(branch_id); } + + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -125,6 +153,7 @@ impl ConnectorPDL { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), + last_finished_handled: None, } } @@ -143,21 +172,28 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); + receiving_branch.awaiting_port = PortIdLocal::new_invalid(); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -187,9 +223,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -225,43 +259,38 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchMissingPortValue(port_id) => { + RunResult::BranchGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_branch_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_branch_received = true; - } - } - if any_branch_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + branch.awaiting_port = PortIdLocal::new_invalid(); + branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } RunResult::BranchAtSyncEnd => { @@ -269,27 +298,37 @@ impl ConnectorPDL { if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); - } else if consistency == Consistency::Inconsistent { + } else { branch.sync_state = SpeculativeState::Inconsistent; } }, + RunResult::BranchFork => { + // Like the `NewChannel` result. This means we're setting up + // a branch and putting a marker inside the RunContext for the + // next time we run the PDL code + let left_id = branch_id; + let right_id = self.tree.fork_branch(left_id); + self.consensus.notify_of_new_branch(left_id, right_id); + self.tree.push_into_queue(QueueKind::Runnable, left_id); + self.tree.push_into_queue(QueueKind::Runnable, right_id); + + let left_branch = &mut self.tree[left_id]; + left_branch.prepared = PreparedStatement::ForkedExecution(true); + let right_branch = &mut self.tree[right_id]; + right_branch.prepared = PreparedStatement::ForkedExecution(false); + } RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } @@ -312,9 +351,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -327,6 +364,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); + debug_assert!(self.last_finished_handled.is_none()); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); @@ -356,7 +394,7 @@ impl ConnectorPDL { RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( + branch.prepared = PreparedStatement::CreatedChannel(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), )); @@ -381,5 +419,6 @@ impl ConnectorPDL { } ctx.notify_sync_end(&[]); + self.last_finished_handled = None; } } \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index c965d64b803233458efe6ae822988749577f809f..daea21188bf15d535f184b8faa1c08a3af865ae8 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,9 +1,10 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox::BranchMarker; -use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; +use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ Message, PortAnnotation, @@ -14,19 +15,20 @@ use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, + cur_marker: BranchMarker, } #[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, - port_mapping: Vec<(ChannelId, BranchId)>, + port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, - channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info + channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -48,13 +50,14 @@ struct Peer { // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups // TODO: A lot of stuff should be batched. Like checking all the sync headers -// and sending "I have a higher ID" messages. +// and sending "I have a higher ID" messages. Should reduce locking by quite a +// bit. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, - branch_annotations: Vec, - last_finished_handled: Option, + branch_annotations: Vec, // index is branch ID + branch_markers: Vec, // index is branch marker, maps to branch // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, @@ -76,7 +79,7 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), - last_finished_handled: None, + branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), peers: Vec::new(), @@ -105,7 +108,6 @@ impl Consensus { pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); - debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, @@ -118,7 +120,9 @@ impl Consensus { expected_firing: None, }) .collect(), + cur_marker: BranchMarker::new_invalid(), }); + self.branch_markers.push(BranchId::new_invalid()); self.highest_connector_id = ctx.id; @@ -131,10 +135,13 @@ impl Consensus { // index is the length in `branch_annotations`. debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ port_mapping: parent_branch_annotations.port_mapping.clone(), + cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); + self.branch_markers.push(new_branch_id); } /// Notifies the consensus algorithm that a branch has reached the end of @@ -187,68 +194,59 @@ impl Consensus { unreachable!("notify_of_speculative_mapping called with unowned port"); } - /// Generates sync messages for any branches that are at the end of the - /// sync block. To find these branches, they should've been put in the - /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { - debug_assert!(self.is_in_sync()); - - let mut last_branch_id = self.last_finished_handled; - for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { - // Turn the port mapping into a local solution - let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; - let mut target_mapping = Vec::with_capacity(source_mapping.len()); - - for port in source_mapping { - // Note: if the port is silent, and we've never communicated - // over the port, then we need to do so now, to let the peer - // component know about our sync leader state. - let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); - let peer_port_id = port_desc.peer_id; - let channel_id = port_desc.channel_id; - - if !self.encountered_ports.contains(&port.port_id) { - ctx.submit_message(Message::Data(DataMessage { - sync_header: SyncHeader{ - sending_component_id: ctx.id, - highest_component_id: self.highest_connector_id, - sync_round: self.sync_round - }, - data_header: DataHeader{ - expected_mapping: source_mapping.clone(), - sending_port: port.port_id, - target_port: peer_port_id, - new_mapping: BranchId::new_invalid(), - }, - content: DataContent::SilentPortNotification, - })); - self.encountered_ports.push(port.port_id); - } - - target_mapping.push(( - channel_id, - port.registered_id.unwrap_or(BranchId::new_invalid()) - )); - } - - let local_solution = LocalSolution{ - component: ctx.id, - final_branch_id: branch.id, - port_mapping: target_mapping, - }; - let solution_branch = self.send_or_store_local_solution(local_solution, ctx); - if solution_branch.is_some() { - // No need to continue iterating, we've found the solution - return solution_branch; + /// Generates a new local solution from a finished branch. If the component + /// is not the leader of the sync region then it will be sent to the + /// appropriate component. If it is the leader then there is a chance that + /// this solution completes a global solution. In that case the solution + /// branch ID will be returned. + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. + let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&port.port_id) { + ctx.submit_message(Message::Data(DataMessage { + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + sync_round: self.sync_round + }, + data_header: DataHeader{ + expected_mapping: source_mapping.clone(), + sending_port: port.port_id, + target_port: peer_port_id, + new_mapping: BranchMarker::new_invalid(), + }, + content: DataContent::SilentPortNotification, + })); + self.encountered_ports.push(port.port_id); } - last_branch_id = Some(branch.id); + target_mapping.push(( + channel_id, + port.registered_id.unwrap_or(BranchMarker::new_invalid()) + )); } - self.last_finished_handled = last_branch_id; - return None; + let local_solution = LocalSolution{ + component: ctx.id, + final_branch_id: branch_id, + port_mapping: target_mapping, + }; + let solution_branch = self.send_or_store_local_solution(local_solution, ctx); + return solution_branch; } + /// Notifies the consensus algorithm about the chosen branch to commit to + /// memory. pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); @@ -263,7 +261,6 @@ impl Consensus { // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); - self.last_finished_handled = None; self.encountered_ports.clear(); self.solution_combiner.clear(); @@ -302,37 +299,36 @@ impl Consensus { // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. - debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, - new_mapping: branch_id + new_mapping: branch.cur_marker, }; // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); - mapping.registered_id = Some(branch_id); + mapping.registered_id = Some(branch.cur_marker); } } + // Update branch marker + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); + branch.cur_marker = new_marker; + self.branch_markers.push(branch_id); + self.encountered_ports.push(source_port_id); return (self.create_sync_header(ctx), data_header); } - /// Handles a new data message by handling the data and sync header, and - /// checking which *existing* branches *can* receive the message. So two - /// cautionary notes: - /// 1. A future branch might also be able to receive this message, see the - /// `branch_can_receive` function. - /// 2. We return the branches that *can* receive the message, you still - /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { - self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); + /// Handles a new data message by handling the sync header. The caller is + /// responsible for checking for branches that might be able to receive + /// the message. + pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { return self.handle_received_sync_header(&message.sync_header, ctx) } @@ -366,18 +362,18 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) { - debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) { + debug_assert!(self.branch_can_receive(branch_id, message)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { - if mapping.port_id == data_header.target_port { + if mapping.port_id == message.data_header.target_port { // Found the port in which the message should be inserted - mapping.registered_id = Some(data_header.new_mapping); + mapping.registered_id = Some(message.data_header.new_mapping); // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); + find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -394,20 +390,20 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool { - if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) { - if sync_header.sync_round < peer.expected_sync_round { + pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) { + if message.sync_header.sync_round < peer.expected_sync_round { return false; } } - if let DataContent::SilentPortNotification = content { + if let DataContent::SilentPortNotification = message.content { // No port can receive a "silent" notification. return false; } let annotation = &self.branch_annotations[branch_id.index as usize]; - for expected in &data_header.expected_mapping { + for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches for current in &annotation.port_mapping { @@ -426,21 +422,6 @@ impl Consensus { // --- Internal helpers - /// Checks data header and consults the stored port mapping and the - /// execution tree to see which branches may receive the data message's - /// contents. - fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { - if branch.awaiting_port == data_header.target_port { - // Found a branch awaiting the message, but we need to make sure - // the mapping is correct - if self.branch_can_receive(branch.id, sync_header, data_header, content) { - target_ids.push(branch.id); - } - } - } - } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves if !self.handle_peer(sync_header) { @@ -578,7 +559,7 @@ impl Consensus { #[derive(Debug)] struct MatchedLocalSolution { final_branch_id: BranchId, - channel_mapping: Vec<(ChannelId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index f1175db78a17bb428cdee71c3bf0cc6c8d43210a..91db3a3711b6eecb1bfb71020328bbc54fad2f25 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -13,10 +13,32 @@ use super::port::PortIdLocal; #[derive(Debug, Copy, Clone)] pub(crate) struct PortAnnotation { pub port_id: PortIdLocal, - pub registered_id: Option, + pub registered_id: Option, pub expected_firing: Option, } +/// Marker for a branch in a port mapping. A marker is, like a branch ID, a +/// unique identifier for a branch, but differs in that a branch only has one +/// branch ID, but might have multiple associated markers (i.e. one branch +/// performing a `put` three times will generate three markers. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct BranchMarker{ + marker: u32, +} + +impl BranchMarker { + #[inline] + pub(crate) fn new(marker: u32) -> Self { + debug_assert!(marker != 0); + return Self{ marker }; + } + + #[inline] + pub(crate) fn new_invalid() -> Self { + return Self{ marker: 0 } + } +} + /// The header added by the synchronization algorithm to all. #[derive(Debug, Clone)] pub(crate) struct SyncHeader { @@ -31,7 +53,7 @@ pub(crate) struct DataHeader { pub expected_mapping: Vec, pub sending_port: PortIdLocal, pub target_port: PortIdLocal, - pub new_mapping: BranchId, + pub new_mapping: BranchMarker, } // TODO: Very much on the fence about this. On one hand I thought making it a diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 73ac568570c9bbfd328c4b774335232f1de83c09..cb02d7df66e4af31130a3d9667dc1a8ebb237254 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,16 +1,18 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; +use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::consensus::find_ports_in_value_group; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; -use super::inbox::{Message, ControlContent, ControlMessage}; +use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -21,70 +23,333 @@ pub(crate) trait Connector { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; } -type SyncDone = Arc<(Mutex, Condvar)>; +pub(crate) struct FinishedSync { + // In the order of the `get` calls + inbox: Vec, +} + +type SyncDone = Arc<(Mutex>, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL, Vec), + SyncRound(Vec), Shutdown, } +// ----------------------------------------------------------------------------- +// ConnectorApplication +// ----------------------------------------------------------------------------- + /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. +// TODO: Strong candidate for logic reduction in handling put/get. A lot of code +// is an approximate copy-pasta from the regular component logic. I'm going to +// wait until I'm implementing more native components to see which logic is +// truly common. pub struct ConnectorApplication { + // Communicating about new jobs and setting up sync rounds sync_done: SyncDone, job_queue: JobQueue, + is_in_sync: bool, + // Handling current sync round + sync_desc: Vec, + tree: FakeTree, + consensus: Consensus, + last_finished_handled: Option, + branch_extra: Vec, // instruction counter per branch +} + +impl Connector for ConnectorApplication { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + if self.is_in_sync { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Can finish sync round immediately + self.collapse_sync_to_solution_branch(solution_branch, comp_ctx); + return ConnectorScheduling::Immediate; + } + } + + return scheduling; + } else { + return self.run_in_deterministic_mode(sched_ctx, comp_ctx); + } + } } impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { - let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); + let sync_done = Arc::new(( Mutex::new(None), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { sync_done: sync_done.clone(), - job_queue: job_queue.clone() + job_queue: job_queue.clone(), + is_in_sync: false, + sync_desc: Vec::new(), + tree: FakeTree::new(), + consensus: Consensus::new(), + last_finished_handled: None, + branch_extra: vec![0], }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); } -} -impl Connector for ConnectorApplication { - fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - // Handle any incoming messages if we're participating in a round + fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { while let Some(message) = comp_ctx.read_next_message() { match message { - Message::Data(_) => todo!("data message in API connector"), - Message::Sync(_) => todo!("sync message in API connector"), - Message::Control(_) => todo!("impossible control message"), + Message::Data(message) => self.handle_new_data_message(message, comp_ctx), + Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx), + Message::Control(_) => unreachable!("control message in native API component"), } } + } - // Handle requests coming from the API - { - let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop_front() { - match job { - ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { - comp_ctx.push_port(endpoint_a); - comp_ctx.push_port(endpoint_b); + pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + if !self.consensus.handle_new_data_message(&message, ctx) { + // Old message, so drop it + return; + } + + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + } + } + + fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(self.is_in_sync); + + self.handle_new_messages(comp_ctx); + + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; + let mut instruction_idx = self.branch_extra[branch_id.index as usize]; + + if instruction_idx >= self.sync_desc.len() { + // Performed last instruction, so this branch is officially at the + // end of the synchronous interaction. + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + } else { + // We still have instructions to perform + let cur_instruction = &self.sync_desc[instruction_idx]; + self.branch_extra[branch_id.index as usize] += 1; + + match &cur_instruction { + ApplicationSyncAction::Put(port_id, content) => { + let port_id = *port_id; + + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + let message = Message::Data(DataMessage { + sync_header, + data_header, + content: DataContent::Message(content.clone()), + }); + comp_ctx.submit_message(message); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + }, + ApplicationSyncAction::Get(port_id) => { + let port_id = *port_id; + + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(instruction_idx + 1); + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; + } } - ApplicationJob::NewConnector(connector, initial_ports) => { - comp_ctx.push_component(connector, initial_ports); - }, - ApplicationJob::Shutdown => { - debug_assert!(queue.is_empty()); - return ConnectorScheduling::Exit; + + if any_message_received { + return ConnectorScheduling::Immediate; } } } } + if self.tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } + + fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.is_in_sync); + + // In non-sync mode the application component doesn't really do anything + // except performing jobs submitted from the API. This is the only + // case where we expect to be woken up. + // Note that we have to communicate to the scheduler when we've received + // ports or created components (hence: given away ports) *before* we + // enter a sync round. + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop_front() { + match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + comp_ctx.push_port(endpoint_a); + comp_ctx.push_port(endpoint_b); + + return ConnectorScheduling::Immediate; + } + ApplicationJob::NewConnector(connector, initial_ports) => { + comp_ctx.push_component(connector, initial_ports); + + return ConnectorScheduling::Later; + }, + ApplicationJob::SyncRound(mut description) => { + // Entering sync mode + comp_ctx.notify_sync_start(); + self.sync_desc = description; + self.is_in_sync = true; + debug_assert!(self.last_finished_handled.is_none()); + debug_assert!(self.branch_extra.len() == 1); + + let first_branch_id = self.tree.start_sync(); + self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + debug_assert!(first_branch_id.index == 1); + self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id); + self.branch_extra.push(0); // set first branch to first instruction + + return ConnectorScheduling::Immediate; + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + + return ConnectorScheduling::Exit; + } + } + } + + // Queue was empty return ConnectorScheduling::NotNow; } + + fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program + // Notifying tree, consensus algorithm and context of ending sync + let mut fake_vec = Vec::new(); + let mut solution_branch = self.tree.end_sync(branch_id); + self.consensus.end_sync(branch_id, &mut fake_vec); + + for port in fake_vec { + debug_assert!(comp_ctx.get_port_by_id(port).is_some()); + } + + comp_ctx.notify_sync_end(&[]); + + // Turning hashmapped inbox into vector of values + let mut inbox = Vec::with_capacity(solution_branch.inbox.len()); + for action in &self.sync_desc { + match action { + ApplicationSyncAction::Put(_, _) => {}, + ApplicationSyncAction::Get(port_id) => { + debug_assert!(solution_branch.inbox.contains_key(port_id)); + inbox.push(solution_branch.inbox.remove(port_id).unwrap()); + }, + } + } + + // Notifying interface of ending sync + self.is_in_sync = false; + self.sync_desc.clear(); + self.branch_extra.truncate(1); + self.last_finished_handled = None; + + let (results, notification) = &*self.sync_done; + let mut results = results.lock().unwrap(); + *results = Some(FinishedSync{ inbox }); + notification.notify_one(); + } +} + +// ----------------------------------------------------------------------------- +// ApplicationInterface +// ----------------------------------------------------------------------------- + +#[derive(Debug)] +pub enum ChannelCreationError { + InSync, +} + +#[derive(Debug)] +pub enum ApplicationStartSyncError { + AlreadyInSync, + NoSyncActions, + IncorrectPortKind, + UnownedPort, +} + +#[derive(Debug)] +pub enum ApplicationEndSyncError { + NotInSync, +} + +pub enum ApplicationSyncAction { + Put(PortIdLocal, ValueGroup), + Get(PortIdLocal), } /// The interface to a `ApplicationConnector`. This allows setting up the @@ -93,21 +358,28 @@ pub struct ApplicationInterface { sync_done: SyncDone, job_queue: JobQueue, runtime: Arc, + is_in_sync: bool, connector_id: ConnectorId, - owned_ports: Vec, + owned_ports: Vec<(PortKind, PortIdLocal)>, } impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, + is_in_sync: false, connector_id: ConnectorId::new_invalid(), owned_ports: Vec::new(), } } - /// Creates a new channel. - pub fn create_channel(&mut self) -> Channel { + /// Creates a new channel. Can only fail if the application interface is + /// currently in sync mode. + pub fn create_channel(&mut self) -> Result { + if self.is_in_sync { + return Err(ChannelCreationError::InSync); + } + let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); debug_assert_eq!(getter_port.kind, PortKind::Getter); let getter_id = getter_port.self_id; @@ -120,31 +392,34 @@ impl ApplicationInterface { // Add to owned ports for error checking while creating a connector self.owned_ports.reserve(2); - self.owned_ports.push(putter_id); - self.owned_ports.push(getter_id); + self.owned_ports.push((PortKind::Putter, putter_id)); + self.owned_ports.push((PortKind::Getter, getter_id)); - return Channel{ putter_id, getter_id }; + return Ok(Channel{ putter_id, getter_id }); } /// Creates a new connector. Note that it is not scheduled immediately, but /// depends on the `ApplicationConnector` to run, followed by the created /// connector being scheduled. - // TODO: Yank out scheduler logic for common use. pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { + if self.is_in_sync { + return Err(ComponentCreationError::InSync); + } + // Retrieve ports and make sure that we own the ones that are currently // specified. This is also checked by the scheduler, but that is done // asynchronously. let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); for initial_port in &initial_ports { - if !self.owned_ports.iter().any(|v| v == initial_port) { + if !self.owned_ports.iter().any(|(_, v)| v == initial_port) { return Err(ComponentCreationError::UnownedPort); } } // We own all ports, so remove them on this side for initial_port in &initial_ports { - let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap(); + let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap(); self.owned_ports.remove(position); } @@ -162,18 +437,66 @@ impl ApplicationInterface { return Ok(()); } - /// Check if the next sync-round is finished. - pub fn try_wait(&self) -> bool { - let (is_done, _) = &*self.sync_done; - let lock = is_done.lock().unwrap(); - return *lock; + /// Queues up a description of a synchronous round to run. Will not actually + /// run the synchronous behaviour in blocking fashion. The results *must* be + /// retrieved using `try_wait` or `wait` for the interface to be considered + /// in non-sync mode. + // TODO: Maybe change API in the future. For now it does the job + pub fn perform_sync_round(&mut self, actions: Vec) -> Result<(), ApplicationStartSyncError> { + if self.is_in_sync { + return Err(ApplicationStartSyncError::AlreadyInSync); + } + + // Check the action ports for consistency + for action in &actions { + let (port_id, expected_kind) = match action { + ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter), + ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter), + }; + + match self.find_port_by_id(port_id) { + Some(port_kind) => { + if port_kind != expected_kind { + return Err(ApplicationStartSyncError::IncorrectPortKind) + } + }, + None => { + return Err(ApplicationStartSyncError::UnownedPort); + } + } + } + + // Everything is consistent, go into sync mode and send the actions off + // to the component that will actually perform the sync round + self.is_in_sync = true; + { + let (is_done, _) = &*self.sync_done; + let mut lock = is_done.lock().unwrap(); + *lock = None; + } + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::SyncRound(actions)); + } + + self.wake_up_connector_with_ping(); + return Ok(()) } - /// Wait until the next sync-round is finished - pub fn wait(&self) { + /// Wait until the next sync-round is finished, returning the received + /// messages in order of `get` calls. + pub fn wait(&mut self) -> Result, ApplicationEndSyncError> { + if !self.is_in_sync { + return Err(ApplicationEndSyncError::NotInSync); + } + let (is_done, condition) = &*self.sync_done; - let lock = is_done.lock().unwrap(); - condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done + let mut lock = is_done.lock().unwrap(); + lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + + self.is_in_sync = false; + return Ok(lock.take().unwrap().inbox); } /// Called by runtime to set associated connector's ID. @@ -198,6 +521,12 @@ impl ApplicationInterface { self.runtime.push_work(key); } } + + fn find_port_by_id(&self, port_id: PortIdLocal) -> Option { + return self.owned_ports.iter() + .find(|(_, owned_id)| *owned_id == port_id) + .map(|(port_kind, _)| *port_kind); + } } impl Drop for ApplicationInterface { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index eb8ccbae8bcebfe6e4c6279c845c0c23e0065445..092b65f090107f47fe0c48f9656a99c6f473e2e6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -296,8 +296,9 @@ impl Scheduler { if scheduled.ctx.is_in_sync { // Just entered sync region } else { - // Just left sync region. So clear inbox - scheduled.ctx.inbox_messages.clear(); + // Just left sync region. So clear inbox up until the last + // message that was read. + scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read); scheduled.ctx.inbox_len_read = 0; } @@ -380,7 +381,7 @@ pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, // never control or ping messages + inbox_messages: Vec, inbox_len_read: usize, // Submitted by the component is_in_sync: bool, diff --git a/src/runtime2/tests/api_component.rs b/src/runtime2/tests/api_component.rs new file mode 100644 index 0000000000000000000000000000000000000000..67271d2986bb610f91a18425edff20de76b0b0f2 --- /dev/null +++ b/src/runtime2/tests/api_component.rs @@ -0,0 +1,158 @@ +// Testing the api component. +// +// These tests explicitly do not use the "NUM_INSTANCES" constant because we're +// doing some communication with the native component. Hence only expect one + +use super::*; + +#[test] +fn test_put_and_get() { + const CODE: &'static str = " + primitive handler(in request, out response, u32 loops) { + u32 index = 0; + while (index < loops) { + sync { + auto value = get(request); + put(response, value * 2); + } + index += 1; + } + } + "; + + let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap(); + let rt = Runtime::new(NUM_THREADS, pd); + let mut api = rt.create_interface(); + + let req_chan = api.create_channel().unwrap(); + let resp_chan = api.create_channel().unwrap(); + + api.create_connector("", "handler", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(req_chan.getter_id.index)), + Value::Output(PortId::new(resp_chan.putter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + + for loop_idx in 0..NUM_LOOPS { + api.perform_sync_round(vec![ + ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])), + ApplicationSyncAction::Get(resp_chan.getter_id) + ]).expect("start sync round"); + + let result = api.wait().expect("finish sync round"); + assert!(result.len() == 1); + if let Value::UInt32(gotten) = result[0].values[0] { + assert_eq!(gotten, loop_idx * 2); + } else { + assert!(false); + } + } +} + +#[test] +fn test_getting_from_component() { + const CODE: &'static str =" + primitive loop_sender(out numbers, u32 cur, u32 last) { + while (cur < last) { + sync { + put(numbers, cur); + cur += 1; + } + } + }"; + + let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap(); + let rt = Runtime::new(NUM_THREADS, pd); + let mut api = rt.create_interface(); + + let channel = api.create_channel().unwrap(); + api.create_connector("", "loop_sender", ValueGroup::new_stack(vec![ + Value::Output(PortId::new(channel.putter_id.index)), + Value::UInt32(1337), + Value::UInt32(1337 + NUM_LOOPS) + ])).unwrap(); + + for loop_idx in 0..NUM_LOOPS { + api.perform_sync_round(vec![ + ApplicationSyncAction::Get(channel.getter_id), + ]).expect("start sync round"); + + let result = api.wait().expect("finish sync round"); + + assert!(result.len() == 1 && result[0].values.len() == 1); + if let Value::UInt32(gotten) = result[0].values[0] { + assert_eq!(gotten, 1337 + loop_idx); + } else { + assert!(false); + } + } +} + +#[test] +fn test_putting_to_component() { + const CODE: &'static str = " + primitive loop_receiver(in numbers, u32 cur, u32 last) { + while (cur < last) { + sync { + auto number = get(numbers); + assert(number == cur); + cur += 1; + } + } + } + "; + + let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap(); + let rt = Runtime::new(NUM_THREADS, pd); + let mut api = rt.create_interface(); + + let channel = api.create_channel().unwrap(); + api.create_connector("", "loop_receiver", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(channel.getter_id.index)), + Value::UInt32(42), + Value::UInt32(42 + NUM_LOOPS) + ])).unwrap(); + + for loop_idx in 0..NUM_LOOPS { + api.perform_sync_round(vec![ + ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::UInt32(42 + loop_idx)])), + ]).expect("start sync round"); + + // Note: if we finish a round, then it must have succeeded :) + api.wait().expect("finish sync round"); + } +} + +#[test] +fn test_doing_nothing() { + const CODE: &'static str = " + primitive getter(in input, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync {} + sync { auto res = get(input); assert(res); } + index += 1; + } + } + "; + + let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap(); + let rt = Runtime::new(NUM_THREADS, pd); + let mut api = rt.create_interface(); + + let channel = api.create_channel().unwrap(); + api.create_connector("", "getter", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(channel.getter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + + for _ in 0..NUM_LOOPS { + api.perform_sync_round(vec![]).expect("start silent sync round"); + api.wait().expect("finish silent sync round"); + api.perform_sync_round(vec![ + ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::Bool(true)])) + ]).expect("start firing sync round"); + let res = api.wait().expect("finish firing sync round"); + assert!(res.is_empty()); + } +} \ No newline at end of file diff --git a/src/runtime2/tests/basics.rs b/src/runtime2/tests/basics.rs new file mode 100644 index 0000000000000000000000000000000000000000..471394c34e84602b9d7197b13a54b41c0f5b4a79 --- /dev/null +++ b/src/runtime2/tests/basics.rs @@ -0,0 +1,91 @@ + +use super::*; + +#[test] +fn test_single_put_and_get() { + const CODE: &'static str = " + primitive putter(out sender, u32 loops) { + u32 index = 0; + while (index < loops) { + sync { + put(sender, true); + } + index += 1; + } + } + + primitive getter(in receiver, u32 loops) { + u32 index = 0; + while (index < loops) { + sync { + auto result = get(receiver); + assert(result); + } + index += 1; + } + } + "; + + let thing = TestTimer::new("single_put_and_get"); + run_test_in_runtime(CODE, |api| { + let channel = api.create_channel().unwrap(); + + api.create_connector("", "putter", ValueGroup::new_stack(vec![ + Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), + Value::UInt32(NUM_LOOPS) + ])).expect("create putter"); + + api.create_connector("", "getter", ValueGroup::new_stack(vec![ + Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })), + Value::UInt32(NUM_LOOPS) + ])).expect("create getter"); + }); +} + +#[test] +fn test_multi_put_and_get() { + const CODE: &'static str = " + primitive putter_static(out vals, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync { + put(vals, 0b00000001); + put(vals, 0b00000100); + put(vals, 0b00010000); + put(vals, 0b01000000); + } + index += 1; + } + } + + primitive getter_dynamic(in vals, u32 num_loops) { + u32 loop_index = 0; + while (loop_index < num_loops) { + sync { + u32 recv_index = 0; + u8 expected = 1; + while (recv_index < 4) { + auto gotten = get(vals); + assert(gotten == expected); + expected <<= 2; + recv_index += 1; + } + } + loop_index += 1; + } + } + "; + + let thing = TestTimer::new("multi_put_and_get"); + run_test_in_runtime(CODE, |api| { + let channel = api.create_channel().unwrap(); + api.create_connector("", "putter_static", ValueGroup::new_stack(vec![ + Value::Output(PortId::new(channel.putter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + api.create_connector("", "getter_dynamic", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(channel.getter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + }) +} \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index a5c2aac79bf56dedc089cee2118064d8f49f87c7..3c2ce3c1a3b75f637a20b7534adf698a2395cefd 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,11 +1,18 @@ +mod network_shapes; +mod api_component; +mod speculation_basic; +mod basics; + use super::*; use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; +use crate::runtime2::native::{ApplicationSyncAction}; -const NUM_THREADS: u32 = 3; // number of threads in runtime -const NUM_INSTANCES: u32 = 5; // number of test instances constructed -const NUM_LOOPS: u32 = 5; // number of loops within a single test (not used by all tests) +// Generic testing constants, use when appropriate to simplify stress-testing +pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime +pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed +pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl"); @@ -23,17 +30,15 @@ fn run_test_in_runtime(pdl: &str, constructor: for _ in 0..NUM_INSTANCES { constructor(&mut api); } - - // Wait until done :) } -struct TestTimer { +pub(crate) struct TestTimer { name: &'static str, started: std::time::Instant } impl TestTimer { - fn new(name: &'static str) -> Self { + pub(crate) fn new(name: &'static str) -> Self { Self{ name, started: std::time::Instant::now() } } } @@ -47,186 +52,3 @@ impl Drop for TestTimer { println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos); } } - -#[test] -fn test_put_and_get() { - const CODE: &'static str = " - primitive putter(out sender, u32 loops) { - u32 index = 0; - while (index < loops) { - synchronous { - put(sender, true); - } - index += 1; - } - } - - primitive getter(in receiver, u32 loops) { - u32 index = 0; - while (index < loops) { - synchronous { - auto result = get(receiver); - assert(result); - } - index += 1; - } - } - "; - - let thing = TestTimer::new("put_and_get"); - run_test_in_runtime(CODE, |api| { - let channel = api.create_channel(); - - api.create_connector("", "putter", ValueGroup::new_stack(vec![ - Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), - Value::UInt32(NUM_LOOPS) - ])).expect("create putter"); - - api.create_connector("", "getter", ValueGroup::new_stack(vec![ - Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })), - Value::UInt32(NUM_LOOPS) - ])).expect("create getter"); - }); -} - -#[test] -fn test_star_shaped_request() { - const CODE: &'static str = " - primitive edge(in input, out output, u32 loops) { - u32 index = 0; - while (index < loops) { - synchronous { - auto req = get(input); - put(output, req * 2); - } - index += 1; - } - } - - primitive center(out[] requests, in[] responses, u32 loops) { - u32 loop_index = 0; - auto num_edges = length(requests); - - while (loop_index < loops) { - // print(\"starting loop\"); - synchronous { - u32 edge_index = 0; - u32 sum = 0; - while (edge_index < num_edges) { - put(requests[edge_index], edge_index); - auto response = get(responses[edge_index]); - sum += response; - edge_index += 1; - } - - assert(sum == num_edges * (num_edges - 1)); - } - // print(\"ending loop\"); - loop_index += 1; - } - } - - composite constructor(u32 num_edges, u32 num_loops) { - auto requests = {}; - auto responses = {}; - - u32 edge_index = 0; - while (edge_index < num_edges) { - channel req_put -> req_get; - channel resp_put -> resp_get; - new edge(req_get, resp_put, num_loops); - requests @= { req_put }; - responses @= { resp_get }; - - edge_index += 1; - } - - new center(requests, responses, num_loops); - } - "; - - let thing = TestTimer::new("star_shaped_request"); - run_test_in_runtime(CODE, |api| { - api.create_connector("", "constructor", ValueGroup::new_stack(vec![ - Value::UInt32(5), - Value::UInt32(NUM_LOOPS), - ])); - }); -} - -#[test] -fn test_conga_line_request() { - const CODE: &'static str = " - primitive start(out req, in resp, u32 num_nodes, u32 num_loops) { - u32 loop_index = 0; - u32 initial_value = 1337; - while (loop_index < num_loops) { - synchronous { - put(req, initial_value); - auto result = get(resp); - assert(result == initial_value + num_nodes * 2); - } - loop_index += 1; - } - } - - primitive middle( - in req_in, out req_forward, - in resp_in, out resp_forward, - u32 num_loops - ) { - u32 loop_index = 0; - while (loop_index < num_loops) { - synchronous { - auto req = get(req_in); - put(req_forward, req + 1); - auto resp = get(resp_in); - put(resp_forward, resp + 1); - } - loop_index += 1; - } - } - - primitive end(in req_in, out resp_out, u32 num_loops) { - u32 loop_index = 0; - while (loop_index < num_loops) { - synchronous { - auto req = get(req_in); - put(resp_out, req); - } - loop_index += 1; - } - } - - composite constructor(u32 num_nodes, u32 num_loops) { - channel initial_req -> req_in; - channel resp_out -> final_resp; - new start(initial_req, final_resp, num_nodes, num_loops); - - in last_req_in = req_in; - out last_resp_out = resp_out; - - u32 node = 0; - while (node < num_nodes) { - channel new_req_fw -> new_req_in; - channel new_resp_out -> new_resp_in; - new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops); - - last_req_in = new_req_in; - last_resp_out = new_resp_out; - - node += 1; - } - - new end(last_req_in, last_resp_out, num_loops); - } - "; - - let thing = TestTimer::new("conga_line_request"); - run_test_in_runtime(CODE, |api| { - api.create_connector("", "constructor", ValueGroup::new_stack(vec![ - Value::UInt32(5), - Value::UInt32(NUM_LOOPS) - ])); - }); -} \ No newline at end of file diff --git a/src/runtime2/tests/network_shapes.rs b/src/runtime2/tests/network_shapes.rs new file mode 100644 index 0000000000000000000000000000000000000000..3797bf0ffea9c0f72e1ae97b735dc4adc97e862d --- /dev/null +++ b/src/runtime2/tests/network_shapes.rs @@ -0,0 +1,145 @@ +// Testing particular graph shapes + +use super::*; + +#[test] +fn test_star_shaped_request() { + const CODE: &'static str = " + primitive edge(in input, out output, u32 loops) { + u32 index = 0; + while (index < loops) { + sync { + auto req = get(input); + put(output, req * 2); + } + index += 1; + } + } + + primitive center(out[] requests, in[] responses, u32 loops) { + u32 loop_index = 0; + auto num_edges = length(requests); + + while (loop_index < loops) { + // print(\"starting loop\"); + sync { + u32 edge_index = 0; + u32 sum = 0; + while (edge_index < num_edges) { + put(requests[edge_index], edge_index); + auto response = get(responses[edge_index]); + sum += response; + edge_index += 1; + } + + assert(sum == num_edges * (num_edges - 1)); + } + // print(\"ending loop\"); + loop_index += 1; + } + } + + composite constructor(u32 num_edges, u32 num_loops) { + auto requests = {}; + auto responses = {}; + + u32 edge_index = 0; + while (edge_index < num_edges) { + channel req_put -> req_get; + channel resp_put -> resp_get; + new edge(req_get, resp_put, num_loops); + requests @= { req_put }; + responses @= { resp_get }; + + edge_index += 1; + } + + new center(requests, responses, num_loops); + } + "; + + let thing = TestTimer::new("star_shaped_request"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + Value::UInt32(5), + Value::UInt32(NUM_LOOPS), + ])).expect("create connector"); + }); +} + +#[test] +fn test_conga_line_request() { + const CODE: &'static str = " + primitive start(out req, in resp, u32 num_nodes, u32 num_loops) { + u32 loop_index = 0; + u32 initial_value = 1337; + while (loop_index < num_loops) { + sync { + put(req, initial_value); + auto result = get(resp); + assert(result == initial_value + num_nodes * 2); + } + loop_index += 1; + } + } + + primitive middle( + in req_in, out req_forward, + in resp_in, out resp_forward, + u32 num_loops + ) { + u32 loop_index = 0; + while (loop_index < num_loops) { + sync { + auto req = get(req_in); + put(req_forward, req + 1); + auto resp = get(resp_in); + put(resp_forward, resp + 1); + } + loop_index += 1; + } + } + + primitive end(in req_in, out resp_out, u32 num_loops) { + u32 loop_index = 0; + while (loop_index < num_loops) { + sync { + auto req = get(req_in); + put(resp_out, req); + } + loop_index += 1; + } + } + + composite constructor(u32 num_nodes, u32 num_loops) { + channel initial_req -> req_in; + channel resp_out -> final_resp; + new start(initial_req, final_resp, num_nodes, num_loops); + + in last_req_in = req_in; + out last_resp_out = resp_out; + + u32 node = 0; + while (node < num_nodes) { + channel new_req_fw -> new_req_in; + channel new_resp_out -> new_resp_in; + new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops); + + last_req_in = new_req_in; + last_resp_out = new_resp_out; + + node += 1; + } + + new end(last_req_in, last_resp_out, num_loops); + } + "; + + let thing = TestTimer::new("conga_line_request"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + Value::UInt32(5), + Value::UInt32(NUM_LOOPS) + ])).expect("create connector"); + }); +} \ No newline at end of file diff --git a/src/runtime2/tests/speculation_basic.rs b/src/runtime2/tests/speculation_basic.rs new file mode 100644 index 0000000000000000000000000000000000000000..ff1d8d0b1111ff2e7764063380bc3921b0ff2b65 --- /dev/null +++ b/src/runtime2/tests/speculation_basic.rs @@ -0,0 +1,69 @@ +// Testing speculation - Basic forms + +use super::*; + +#[test] +fn test_maybe_do_nothing() { + // Three variants in which the behaviour in which nothing is performed is + // somehow not allowed. Note that we "check" by seeing if the test finishes. + // Only the branches in which ports fire increment the loop index + const CODE: &'static str = " + primitive only_puts(out output, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync { put(output, true); } + index += 1; + } + } + + primitive might_put(out output, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync { + fork { put(output, true); index += 1; } + or {} + } + } + } + + primitive only_gets(in input, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync { auto res = get(input); assert(res); } + index += 1; + } + } + + primitive might_get(in input, u32 num_loops) { + u32 index = 0; + while (index < num_loops) { + sync fork { auto res = get(input); assert(res); index += 1; } or {} + } + } + "; + + // Construct all variants which should work and wait until the runtime exits + run_test_in_runtime(CODE, |api| { + // only putting -> maybe getting + let channel = api.create_channel().unwrap(); + api.create_connector("", "only_puts", ValueGroup::new_stack(vec![ + Value::Output(PortId::new(channel.putter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + api.create_connector("", "might_get", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(channel.getter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + + // maybe putting -> only getting + let channel = api.create_channel().unwrap(); + api.create_connector("", "might_put", ValueGroup::new_stack(vec![ + Value::Output(PortId::new(channel.putter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + api.create_connector("", "only_gets", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(channel.getter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + }) +} \ No newline at end of file diff --git a/testdata/parser/negative/1.pdl b/testdata/parser/negative/1.pdl index 4f0fafc1f9d3ade96980f3dd09e93b65e97225e7..22648169831112ea824afff2a7d564c8169f3c4f 100644 --- a/testdata/parser/negative/1.pdl +++ b/testdata/parser/negative/1.pdl @@ -3,8 +3,8 @@ // sync block nested twice in primitive primitive main(in a, out b) { while (true) { - synchronous { - synchronous {} + sync { + sync {} } } } diff --git a/testdata/parser/negative/1.txt b/testdata/parser/negative/1.txt index c058a7760693fbb6d6d4dc63b4f34c4b8de76e8c..784143a9e09e5acfb1c00417753d1a3d510bf961 100644 --- a/testdata/parser/negative/1.txt +++ b/testdata/parser/negative/1.txt @@ -1,3 +1,3 @@ Parse error at 1.pdl:7:4: Illegal nested synchronous statement - synchronous {} + sync {} ^ diff --git a/testdata/parser/negative/10.pdl b/testdata/parser/negative/10.pdl index 60a8f81d01887a82d29e983b883efae781be3da4..5aad08d66854192cd6250258dee63a4fbdb61249 100644 --- a/testdata/parser/negative/10.pdl +++ b/testdata/parser/negative/10.pdl @@ -3,9 +3,9 @@ // sync block nested in sync block primitive main(in a, out b) { while (true) { - synchronous { + sync { if (false || true) { - synchronous { + sync { skip; } } diff --git a/testdata/parser/negative/10.txt b/testdata/parser/negative/10.txt index 439a71d657dd786f5f5f8f14e75563f06360d437..5d5e749c38785814f5ba186420ab90b45fddafa3 100644 --- a/testdata/parser/negative/10.txt +++ b/testdata/parser/negative/10.txt @@ -1,3 +1,3 @@ Parse error at 10.pdl:8:5: Illegal nested synchronous statement - synchronous { + sync { ^ diff --git a/testdata/parser/negative/12.pdl b/testdata/parser/negative/12.pdl index 049b2b4539efcd49d3a04651aa5be79336571e3d..fc8d136b2c35e1ce36ab1b85a5814f64b57c1a8b 100644 --- a/testdata/parser/negative/12.pdl +++ b/testdata/parser/negative/12.pdl @@ -4,6 +4,6 @@ primitive main(in a, out b) { while (true) { channel x -> y; - synchronous {} + sync {} } } diff --git a/testdata/parser/negative/19.pdl b/testdata/parser/negative/19.pdl index 66a30eed35ce366e127cde061aefad4d89bc8336..583cf1896967b4845e438b8672005c2839d73fa4 100644 --- a/testdata/parser/negative/19.pdl +++ b/testdata/parser/negative/19.pdl @@ -2,7 +2,7 @@ primitive main(in a) { while (true) { - synchronous { + sync { if (fires(a)) { return 5; } else { diff --git a/testdata/parser/negative/24.pdl b/testdata/parser/negative/24.pdl index aa44bdda2068eb610250dd1932822fa70c0b8afa..8e7ee6e0b969b1fc5a99a9729cb3f6c5b9383fee 100644 --- a/testdata/parser/negative/24.pdl +++ b/testdata/parser/negative/24.pdl @@ -7,7 +7,7 @@ primitive main(in a, out b) { y %= x -= 3; x *= x * x *= 5; while (true) { - synchronous { + sync { assert fires(a) == fires(b); } } diff --git a/testdata/parser/negative/3.pdl b/testdata/parser/negative/3.pdl index 67caa9d0a862f387cb25a7d1c04c81df7eead3c3..b62869969dc222da490e6fed9dd101206864d5d3 100644 --- a/testdata/parser/negative/3.pdl +++ b/testdata/parser/negative/3.pdl @@ -4,7 +4,7 @@ composite main(in a, out b) { channel x -> y; while (true) { - synchronous { + sync { skip; } } diff --git a/testdata/parser/negative/3.txt b/testdata/parser/negative/3.txt index 010dcbaa770c39d19aa5ad9b8d20902b4ba12a62..d989513aa104beadaff14a695470bbbc13f7c063 100644 --- a/testdata/parser/negative/3.txt +++ b/testdata/parser/negative/3.txt @@ -1,3 +1,3 @@ Parse error at 3.pdl:7:3: Illegal nested synchronous statement - synchronous { + sync { ^ diff --git a/testdata/parser/negative/31.pdl b/testdata/parser/negative/31.pdl index d4aa9911681b1145e612483995e90856766bca56..c2415a04268b2000055f88105679cc60b04a9275 100644 --- a/testdata/parser/negative/31.pdl +++ b/testdata/parser/negative/31.pdl @@ -2,7 +2,7 @@ primitive main(int a) { while (true) { - synchronous { + sync { break; // not allowed } } diff --git a/testdata/parser/negative/32.pdl b/testdata/parser/negative/32.pdl index 0d9cecc51e6e4f4019b88889b6714e6cc64a7c30..6d7446d6580cc08e873c367ccde51b9697a63667 100644 --- a/testdata/parser/negative/32.pdl +++ b/testdata/parser/negative/32.pdl @@ -2,7 +2,7 @@ primitive main(int a) { loop: { - synchronous { + sync { goto loop; // not allowed } } diff --git a/testdata/parser/negative/4.pdl b/testdata/parser/negative/4.pdl index bf323606bd952b1d97999d4a9f14d35e1e4bfadc..9d34caa72fdaefd46c77bfc59cf8d91b89b38fa8 100644 --- a/testdata/parser/negative/4.pdl +++ b/testdata/parser/negative/4.pdl @@ -6,7 +6,7 @@ primitive main(in a, out b) { msg y = create(0); // legal while (x < 10) { y = get(a); // illegal - synchronous { + sync { y = get(a); // legal } } diff --git a/testdata/parser/negative/6.pdl b/testdata/parser/negative/6.pdl index 7f523a40d6a8bf1b7912888956be1a8da25803c6..e14397e74e2eb9a777d743bb0ab71782c3b8cfaa 100644 --- a/testdata/parser/negative/6.pdl +++ b/testdata/parser/negative/6.pdl @@ -4,5 +4,5 @@ import std.reo; // duplicate formal parameters composite main(in a, out a) { - new sync(a, a); + new sync_component(a, a); } diff --git a/testdata/parser/negative/7.pdl b/testdata/parser/negative/7.pdl index b37b28604db2ba82798625f4cbcb04ca35b8f69f..6dc6bee31260777ad45fad4e26eef61e3d43224b 100644 --- a/testdata/parser/negative/7.pdl +++ b/testdata/parser/negative/7.pdl @@ -5,5 +5,5 @@ import std.reo; // shadowing formal parameter composite main(in a, out b) { channel c -> a; - new sync(a, b); + new sync_component(a, b); } diff --git a/testdata/parser/negative/8.pdl b/testdata/parser/negative/8.pdl index af9f36c5b48055bad30484961575675acaa8a77c..7adf05b4d5063b151a358fb367edf3ab2ef78961 100644 --- a/testdata/parser/negative/8.pdl +++ b/testdata/parser/negative/8.pdl @@ -10,7 +10,7 @@ composite main(in a, out b) { // shadowing import primitive syncdrain(in a, in b) { while (true) { - synchronous { + sync { if (!fires(a) || !fires(b)) { block(a); block(b); diff --git a/testdata/parser/positive/1.pdl b/testdata/parser/positive/1.pdl index 9e88a779636f927945eae0fd3a4ed04379deac03..725f98df28b34b207f823d46141a845dc0e75cac 100644 --- a/testdata/parser/positive/1.pdl +++ b/testdata/parser/positive/1.pdl @@ -11,7 +11,7 @@ composite main(in asend, out arecv, in bsend, out brecv) { primitive replicator(in a, out b, out c) { while (true) { - synchronous { + sync { if (fires(a) && fires(b) && fires(c)) { msg x = get(a); put(b, x); @@ -40,7 +40,7 @@ composite sequencer(in x, in y) { primitive syncdrain(in a, in b) { while (true) { - synchronous { + sync { if (fires(a) && fires(b)) { get(a); get(b); @@ -54,7 +54,7 @@ primitive syncdrain(in a, in b) { primitive fifo(in a, out b, msg init) { msg c = init; while (true) { - synchronous { + sync { if (c != null) { assert !fires(a); if (fires(b)) { @@ -75,7 +75,7 @@ primitive sequencer2(in x, in y) { while (true) { boolean b = false; while (!b) { - synchronous { + sync { assert !fires(y); if (fires(x)) b = true; @@ -83,7 +83,7 @@ primitive sequencer2(in x, in y) { } b = false; while (!b) { - synchronous { + sync { assert !fires(x); if (fires(y)) b = true; diff --git a/testdata/parser/positive/10.pdl b/testdata/parser/positive/10.pdl index 0c5d30773f0a2016c9910948ff7b9de81e2c4cb2..90775a2e3eebce117a5dd01995229ae5d903480b 100644 --- a/testdata/parser/positive/10.pdl +++ b/testdata/parser/positive/10.pdl @@ -4,7 +4,7 @@ composite main() {} primitive example(in a, out[] b) { while (true) { - synchronous { + sync { if (fires(a)) { int i = 0; while (i < b.length) { diff --git a/testdata/parser/positive/11.pdl b/testdata/parser/positive/11.pdl index 43fea6c6c4c47556c13694ea8a084d31ce21be93..13f7f57bc2d5afdf5853057957afeab1fc91788a 100644 --- a/testdata/parser/positive/11.pdl +++ b/testdata/parser/positive/11.pdl @@ -3,13 +3,13 @@ primitive main(in a, out b) { msg x = null; while (x == null) { - synchronous { + sync { if (fires(a)) x = get(a); } } while (true) { - synchronous { + sync { if (fires(b)) put(b, x); } diff --git a/testdata/parser/positive/12.pdl b/testdata/parser/positive/12.pdl index 1ac91cd80217398a5d0b89f821a4a03c521ff563..8647d44b9249e4d85a23309ec6aa540047e2f247 100644 --- a/testdata/parser/positive/12.pdl +++ b/testdata/parser/positive/12.pdl @@ -7,7 +7,7 @@ primitive main(in a, out b) { y %= x -= 3; x *= x * (x *= 5); while (true) { - synchronous { + sync { assert fires(a) == fires(b); } } diff --git a/testdata/parser/positive/13.pdl b/testdata/parser/positive/13.pdl index a146785267d8b5ad9a95990187756968d6c969b4..910fb084be3046388f25b474fd5497dcc6460da0 100644 --- a/testdata/parser/positive/13.pdl +++ b/testdata/parser/positive/13.pdl @@ -14,7 +14,7 @@ composite example(in[] a, in[] b, out x) { primitive resolve(in[] a, in[] b, out x) { while (true) { - synchronous { + sync { int i = 0; while (i < a.length && i < b.length) { if (fires(a[i]) && fires(b[i])) { @@ -31,7 +31,7 @@ primitive resolve(in[] a, in[] b, out x) { primitive async(in[] a) { while (true) { - synchronous { + sync { int i = 0; while (i < a.length) if (fires(a[i++])) break; diff --git a/testdata/parser/positive/14.pdl b/testdata/parser/positive/14.pdl index 622e756f27a3218031b0ed0e475eb22f95d6aac3..4ac063f5521d7f24c3f260fcc067f10783792209 100644 --- a/testdata/parser/positive/14.pdl +++ b/testdata/parser/positive/14.pdl @@ -3,13 +3,13 @@ composite main(out c) { channel ao -> ai; channel bo -> bi; - new sync(ai, bo); + new sync_component(ai, bo); new binary_replicator(bi, ao, c); } primitive sync(in a, out b) { while (true) { - synchronous { + sync { if (fires(a) && fires(b)) { msg x = get(a); put(b, x); @@ -22,7 +22,7 @@ primitive sync(in a, out b) { primitive binary_replicator(in b, out a, out c) { while (true) { - synchronous { + sync { if (fires(b) && fires(a) && fires(c)) { msg x = get(b); put(a, x); diff --git a/testdata/parser/positive/15.pdl b/testdata/parser/positive/15.pdl index 8c4ceabd007701e0e5089e763fbaa866d7b56e7d..b01b83017d78daa2452998ad67b8ea4408244bec 100644 --- a/testdata/parser/positive/15.pdl +++ b/testdata/parser/positive/15.pdl @@ -7,7 +7,7 @@ composite main(out c) { channel bo -> bi; channel axo -> axi; channel zo -> zi; - new sync(ai, bo); + new sync_component(ai, bo); new replicator(bi, {axo, c}); new consensus({axi, zi}, ao); new generator(zo); diff --git a/testdata/parser/positive/16.pdl b/testdata/parser/positive/16.pdl index 604bdf0bd56c3344fc9ff34aa8e2919924f348b5..06d6a4ef972caee119206004cf47bb3cdcab5d7d 100644 --- a/testdata/parser/positive/16.pdl +++ b/testdata/parser/positive/16.pdl @@ -7,7 +7,7 @@ composite main() { } primitive a(in x) { - synchronous { + sync { msg m = get(x); assert m.length == 5; assert m[0] == 'h'; @@ -25,7 +25,7 @@ primitive b(out x) { } // or primitive c(out x) { - synchronous { + sync { msg m = create(5); m[0] = 'h'; m[1] = 'e'; diff --git a/testdata/parser/positive/17.pdl b/testdata/parser/positive/17.pdl index 6b3934d0a050516cac284b7a5684f1918c27d421..36f68933457d82badaa6e40018f542f3f756a062 100644 --- a/testdata/parser/positive/17.pdl +++ b/testdata/parser/positive/17.pdl @@ -8,7 +8,7 @@ primitive prophet(in b, out a) { msg c = null; while (true) { if (c != null) { - synchronous { + sync { assert !fires(a); if (fires(b)) { assert get(b) == c; @@ -31,7 +31,7 @@ primitive fifo(in a, out b, msg init) { msg c = init; while (true) { if (c != null) { - synchronous { + sync { assert !fires(a); if (fires(b)) { put(b, c); @@ -39,7 +39,7 @@ primitive fifo(in a, out b, msg init) { } } } else { - synchronous { + sync { assert !fires(b); if (fires(a)) { c = get(a); diff --git a/testdata/parser/positive/18.pdl b/testdata/parser/positive/18.pdl index 5482cb85c351f86974ce4d87eb7b4936c98719d0..1427b45e186abd5cfb8437eb09e37c4402695a75 100644 --- a/testdata/parser/positive/18.pdl +++ b/testdata/parser/positive/18.pdl @@ -12,7 +12,7 @@ primitive main1(in a, out c) { x = 1; y = 1; while (true) { - synchronous { + sync { if (x > 0 && fires(a)) { z = get(a); x--; @@ -23,7 +23,7 @@ primitive main1(in a, out c) { y++; } } - synchronous { + sync { assert !fires(a) && !fires(c); if (z != null && y > 0) { w = z; diff --git a/testdata/parser/positive/19.pdl b/testdata/parser/positive/19.pdl index 4084ce9e3962ab5893ccff2dbbd566060e0160f3..5eb964bf010b2f0c98471cac62e44d0c81d1160d 100644 --- a/testdata/parser/positive/19.pdl +++ b/testdata/parser/positive/19.pdl @@ -3,7 +3,7 @@ composite main() {} primitive example(int a) { - synchronous { + sync { loop: { goto loop; // allowed } diff --git a/testdata/parser/positive/2.pdl b/testdata/parser/positive/2.pdl index dedfb7e3075677fafdf54038b47287aec80f28cf..08c6424301a45f808a1d76f042f2c32144387dd6 100644 --- a/testdata/parser/positive/2.pdl +++ b/testdata/parser/positive/2.pdl @@ -16,7 +16,7 @@ composite main(in asend, out arecv, in bsend, out brecv, in csend, out crecv) { primitive mymerger(in a, in b, out c) { while (true) { - synchronous { + sync { if (fires(a) && !fires(b) && fires(c)) { put(c, get(a)); } else if (!fires(a) && fires(b) && fires(c)) { diff --git a/testdata/parser/positive/3.pdl b/testdata/parser/positive/3.pdl index 11fb1a52fb3b3a1f5b7b6efa41abb16b1c47291a..f458ce5c398c5ff7a416c4e4a3577b41fcfbcf0a 100644 --- a/testdata/parser/positive/3.pdl +++ b/testdata/parser/positive/3.pdl @@ -30,7 +30,7 @@ composite main(in ai, out ao, in bi, out bo, in ci, out co, in di, out do) { primitive computeMax(in a, in b, in c, in d, out x) { while (true) { - synchronous { + sync { if (fires(a) && fires(b) && fires(c) && fires(d) && fires(x)) { msg aa = get(a); msg bb = get(b); diff --git a/testdata/parser/positive/5.pdl b/testdata/parser/positive/5.pdl index 4f3d6a9663bbe4901a06f0c31688497c94603e4a..3804782f9308b5b139d3e24069ba8ba88f7acf20 100644 --- a/testdata/parser/positive/5.pdl +++ b/testdata/parser/positive/5.pdl @@ -5,7 +5,7 @@ import std.buf; primitive main(in a, out b) { while (true) { - synchronous { + sync { if (fires(a) && fires(b)) { msg x = get(a); short y = readShort(x, 0); diff --git a/testdata/parser/positive/6.pdl b/testdata/parser/positive/6.pdl index 77e1fc1425fa1f815789da3388fb394cdfe017a2..a9bdfc9f55bac1624572a645d06796ea3adc37fb 100644 --- a/testdata/parser/positive/6.pdl +++ b/testdata/parser/positive/6.pdl @@ -14,7 +14,7 @@ composite replicator(in a, out[] b) { if (b.length == 0) { new blocking(a); } else if (b.length == 1) { - new sync(a, b[0]); + new sync_component(a, b[0]); } else { channel xo -> xi; new binary_replicator(a, b[0], xo); @@ -23,7 +23,7 @@ composite replicator(in a, out[] b) { } primitive binary_replicator(in a, out b, out c) { while (true) { - synchronous { + sync { if (fires(a) && fires(b) && fires(c)) { msg x = get(a); put(b, x); @@ -35,7 +35,7 @@ primitive binary_replicator(in a, out b, out c) { } } primitive blocking(in a) { - while (true) synchronous { + while (true) sync { assert !fires(a); } } @@ -52,12 +52,12 @@ composite merger(in[] a, out b) { prev = yi; i++; } - new sync(prev, b); + new sync_component(prev, b); } } primitive binary_merger(in a, in b, out c) { while (true) { - synchronous { + sync { if (fires(a) && fires(c)) { assert !fires(b); put(c, get(a)); @@ -71,14 +71,14 @@ primitive binary_merger(in a, in b, out c) { } } primitive silent(out a) { - while (true) synchronous { + while (true) sync { assert !fires(a); } } primitive sync(in a, out b) { while (true) { - synchronous { + sync { if (fires(a) && fires(b)) { put(b, get(a)); } else { diff --git a/testdata/parser/positive/7.pdl b/testdata/parser/positive/7.pdl index ffa6bbf17b4a7a656efeaee75ed0d48491c049ae..8287e88afd2e3b4ca72fec7eb7eb7815b78d8709 100644 --- a/testdata/parser/positive/7.pdl +++ b/testdata/parser/positive/7.pdl @@ -50,7 +50,7 @@ composite puzzle(in[] a, in[] b, out x) { primitive resolve(in[] a, in[] b, out x) { while (true) { - synchronous { + sync { int i = 0; while (i < a.length && i < b.length) { if (fires(a[i]) && fires(b[i])) { @@ -67,7 +67,7 @@ primitive resolve(in[] a, in[] b, out x) { primitive async(in[] a) { while (true) { - synchronous { + sync { int i = 0; int j = 0; while (i < a.length) { diff --git a/testdata/parser/positive/8.pdl b/testdata/parser/positive/8.pdl index d7b37e7c5089fe3b04637d38e03992b9b77e9c10..12028358391187f031051c78f9f5e6267ce07a0e 100644 --- a/testdata/parser/positive/8.pdl +++ b/testdata/parser/positive/8.pdl @@ -31,7 +31,7 @@ composite main(out x) { primitive evil_or_odious(in x, out y) { while (true) { - synchronous { + sync { if (fires(x) && fires(y)) { msg a = get(x); msg result = create(1); @@ -53,7 +53,7 @@ primitive evil_or_odious(in x, out y) { primitive recorder(out h, in a) { msg c = create(0); while (true) { - synchronous { + sync { if (fires(h) && fires(a)) { put(h, c); { diff --git a/testdata/parser/positive/tarry.pdl b/testdata/parser/positive/tarry.pdl index 78582acc437fe5cfff483421127fae22f21786ff..e1ef90cfdaada7279a8a1047037979ebb1033006 100644 --- a/testdata/parser/positive/tarry.pdl +++ b/testdata/parser/positive/tarry.pdl @@ -43,7 +43,7 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) { while (true) { // Step 1. Initiator waits for token while (token == null) { - synchronous { + sync { if (fires(start)) { token = get(start); } @@ -59,7 +59,7 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) { int idx = 0; // Select first channel that accepts our token while (token != null) { - synchronous { + sync { int i = 0; while (i < neighboro.length) { if (fires(neighboro[i])) { @@ -78,7 +78,7 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) { neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length]; // Step 3. Await return of token while (token == null) { - synchronous { + sync { int i = 0; while (i < peeri.length + neighbori.length) { if (fires(peeri@neighbori[i])) { @@ -91,7 +91,7 @@ primitive initiator(in start, out end, in[] peeri, out[] peero) { } // Step 4. Token is back and all neighbors visited while (token != null) { - synchronous { + sync { if (fires(end)) { put(end, token); token = null; @@ -111,7 +111,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { int idx = 0; // Step 1. Await token for first time while (token == null) { - synchronous { + sync { int i = 0; while (i < peeri.length) { if (fires(peeri[i])) { @@ -131,7 +131,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { peero = {}; // Step 2. Non-initiator signals while (token != null) { - synchronous { + sync { if (fires(end)) { put(end, token); token = null; @@ -139,7 +139,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { } } while (token == null) { - synchronous { + sync { if (fires(start)) { token = get(start); } @@ -150,7 +150,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { idx = 0; // Select first channel that accepts our token while (token != null) { - synchronous { + sync { int i = 0; while (i < neighboro.length) { if (fires(neighboro[i])) { @@ -169,7 +169,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { neighboro = neighboro[0:idx] @ neighboro[idx:neighboro.length]; // Step 4. Await return of token while (token == null) { - synchronous { + sync { int i = 0; while (i < peeri.length + neighbori.length) { if (fires(peeri@neighbori[i])) { @@ -182,7 +182,7 @@ primitive noninitiator(out start, in end, in[] peeri, out[] peero) { } // Step 5. Token is back, pass to parent while (token != null) { - synchronous { + sync { if (fires(parento[0])) { put(parento[0], token); token = null;