diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 610e58090392659f40c6b10c781a3d93fb55c371..c755413a0da76f4a30899939d3d1873eaaba0c39 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -206,7 +206,7 @@ pub enum EvalContinuation { BlockGet(ExpressionId, PortId), Put(ExpressionId, PortId, ValueGroup), SelectStart(u32, u32), // (num_cases, num_ports_total) - SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id) + SelectRegisterPort(ExpressionId, u32, u32, PortId), // (call_expr_id, case_index, port_index_in_case, port_id) SelectWait, // wait until select can continue // Returned only in non-sync mode ComponentTerminated, @@ -757,7 +757,7 @@ impl Prompt { let port_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32(); let port_value = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_port_id(); - return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value)); + return Ok(EvalContinuation::SelectRegisterPort(expr_id, case_index, port_index, port_value)); }, Method::SelectWait => { match ctx.performed_select_wait() { diff --git a/src/protocol/parser/pass_rewriting.rs b/src/protocol/parser/pass_rewriting.rs index 82702bd1056e5706249f1636604aba452aeb820c..097359f0e0eddc48e4887a0485f0f5cc9d9e74d8 100644 --- a/src/protocol/parser/pass_rewriting.rs +++ b/src/protocol/parser/pass_rewriting.rs @@ -217,7 +217,7 @@ impl Visitor for PassRewriting { num_ports_expression_id.upcast() ]; - let call_expression_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments); + let call_expression_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments); let call_statement_id = create_ast_expression_stmt(ctx, call_expression_id.upcast()); transformed_stmts.push(call_statement_id.upcast()); @@ -233,6 +233,9 @@ impl Visitor for PassRewriting { for case_port_index in 0..case_num_ports { // Arguments to runtime call + let original_get_call_id = case.involved_ports[case_port_index].0; + let original_get_call_span = ctx.heap[original_get_call_id].full_span; + let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id); let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id); @@ -244,7 +247,7 @@ impl Visitor for PassRewriting { ]; // Create runtime call, then store it - let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments); + let runtime_call_expr_id = create_ast_call_expr(ctx, original_get_call_span, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments); let runtime_call_stmt_id = create_ast_expression_stmt(ctx, runtime_call_expr_id.upcast()); transformed_stmts.push(runtime_call_stmt_id.upcast()); @@ -261,7 +264,7 @@ impl Visitor for PassRewriting { locals.push((select_variable_id, select_variable_type)); { - let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new()); + let runtime_call_expr_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new()); let variable_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, select_variable_id, select_variable_type, runtime_call_expr_id.upcast()); transformed_stmts.push(variable_stmt_id.upcast().upcast()); } @@ -364,7 +367,12 @@ fn create_ast_variable_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDef }); } -fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer, arguments: Vec) -> CallExpressionId { +/// Creates an AST call expression. The provided expression span is useful for +/// the cases where we perform compiler-builtin function calls that, when they +/// fail, should provide an error pointing at a specific point in the source +/// code. The `containing_procedure_id` is the procedure whose instructions are +/// going to contain this new call expression. +fn create_ast_call_expr(ctx: &mut Ctx, span: InputSpan, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer, arguments: Vec) -> CallExpressionId { let call_type_id = match method { Method::SelectStart => ctx.arch.void_type_id, Method::SelectRegisterCasePort => ctx.arch.void_type_id, @@ -377,7 +385,7 @@ fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinit let call_expression_id = ctx.heap.alloc_call_expression(|this| CallExpression{ func_span: InputSpan::new(), this, - full_span: InputSpan::new(), + full_span: span, parser_type: ParserType{ elements: Vec::new(), full_span: InputSpan::new(), diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 717810237d4a6aa84b74f9e6e7ae0f2adab9ce88..5d1b853af70754d563968c1b355dadd4c0c48e92 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -398,6 +398,7 @@ pub(crate) fn default_handle_control_message( // port (in case of error messages) let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = message.sender_comp_id; + port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) let port_info = comp_ctx.get_port(port_handle); let peer_comp_id = port_info.peer_comp_id; @@ -425,7 +426,6 @@ pub(crate) fn default_handle_control_message( // that if we have a successful sync round, followed by the peer // closing the port, that we don't consider the sync round to // have failed by mistake. - let error_due_to_port_use = if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { return Err(( last_instruction, @@ -546,7 +546,7 @@ pub(crate) fn default_handle_busy_exit( /// 2. The component has encountered an error during a sync round and is /// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( - sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, + sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { let success = match decision { @@ -568,6 +568,12 @@ pub(crate) fn default_handle_sync_decision( if success { // We cannot get a success message if the component has encountered an // error. + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.close_at_sync_end { + port_info.state = PortState::Closed; + } + } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; return Some(true); diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 1a8ae2ec64d3393c240a88d43ed921d95a9ad204..c0ec092a208d6353fa9f266ed939cc749fb9288a 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -19,7 +19,8 @@ pub struct Port { pub peer_port_id: PortId, // eventually consistent pub kind: PortKind, pub state: PortState, - pub last_instruction: PortInstruction, + pub last_instruction: PortInstruction, // used during sync round in case port ends up being closed for error reporting + pub close_at_sync_end: bool, // used during sync round when receiving a `ClosePort(not_in_sync_round)` message #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, } @@ -70,6 +71,7 @@ impl CompCtx { state: PortState::Open, peer_comp_id: self.id, last_instruction: PortInstruction::None, + close_at_sync_end: false, #[cfg(debug_assertions)] associated_with_peer: false, }); self.ports.push(Port{ @@ -79,6 +81,7 @@ impl CompCtx { state: PortState::Open, peer_comp_id: self.id, last_instruction: PortInstruction::None, + close_at_sync_end: false, #[cfg(debug_assertions)] associated_with_peer: false, }); @@ -91,6 +94,7 @@ impl CompCtx { self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, last_instruction: PortInstruction::None, + close_at_sync_end: false, #[cfg(debug_assertions)] associated_with_peer: false, }); return LocalPortHandle(self_id); diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 49558c30e73d6fb90b48875c9f3bf0f8fa248450..cc1b91b1a9023cc110ec827b2ad7e1022b3027c1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -318,7 +318,10 @@ impl Component for CompPDL { let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = PortInstruction::SourceLocation(expr_id); + let port_is_closed = port_info.state == PortState::Closed; let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { @@ -340,9 +343,23 @@ impl Component for CompPDL { return CompScheduling::Immediate; } } else { - todo!("handle sync failure due to message deadlock"); + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s") + )); return CompScheduling::Sleep; } + } else if port_is_closed { + // No messages, but getting makes no sense as the port is + // closed. + let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) + ))); + return CompScheduling::Immediate; } else { // We need to wait self.exec_state.set_as_blocked_get(port_id); @@ -377,12 +394,36 @@ impl Component for CompPDL { self.select_state.handle_select_start(num_cases); return CompScheduling::Requeue; }, - EC::SelectRegisterPort(case_index, port_index, port_id) => { + EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + + // Note: we register the "last_instruction" here already. This + // way if we get a `ClosePort` message, the condition to fail + // the synchronous round is satisfied. + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = PortInstruction::SourceLocation(expr_id); + let port_is_closed = port_info.state == PortState::Closed; + + // Register port as part of select guard if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { - todo!("handle registering a port multiple times"); + // Failure occurs if a port is used twice in the same guard + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + String::from("Cannot have the one port appear in the same guard twice") + ))); + } else if port_is_closed { + // Port is closed + let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0) + ))); } + return CompScheduling::Immediate; }, EC::SelectWait => { @@ -492,7 +533,7 @@ impl CompPDL { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {