diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 49558c30e73d6fb90b48875c9f3bf0f8fa248450..cc1b91b1a9023cc110ec827b2ad7e1022b3027c1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -318,7 +318,10 @@ impl Component for CompPDL { let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = PortInstruction::SourceLocation(expr_id); + let port_is_closed = port_info.state == PortState::Closed; let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { @@ -340,9 +343,23 @@ impl Component for CompPDL { return CompScheduling::Immediate; } } else { - todo!("handle sync failure due to message deadlock"); + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s") + )); return CompScheduling::Sleep; } + } else if port_is_closed { + // No messages, but getting makes no sense as the port is + // closed. + let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) + ))); + return CompScheduling::Immediate; } else { // We need to wait self.exec_state.set_as_blocked_get(port_id); @@ -377,12 +394,36 @@ impl Component for CompPDL { self.select_state.handle_select_start(num_cases); return CompScheduling::Requeue; }, - EC::SelectRegisterPort(case_index, port_index, port_id) => { + EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); + let port_handle = comp_ctx.get_port_handle(port_id); + + // Note: we register the "last_instruction" here already. This + // way if we get a `ClosePort` message, the condition to fail + // the synchronous round is satisfied. + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = PortInstruction::SourceLocation(expr_id); + let port_is_closed = port_info.state == PortState::Closed; + + // Register port as part of select guard if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { - todo!("handle registering a port multiple times"); + // Failure occurs if a port is used twice in the same guard + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + String::from("Cannot have the one port appear in the same guard twice") + ))); + } else if port_is_closed { + // Port is closed + let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; + let protocol = &sched_ctx.runtime.protocol; + self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, expr_id, + format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0) + ))); } + return CompScheduling::Immediate; }, EC::SelectWait => { @@ -492,7 +533,7 @@ impl CompPDL { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {