diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 717810237d4a6aa84b74f9e6e7ae0f2adab9ce88..5d1b853af70754d563968c1b355dadd4c0c48e92 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -398,6 +398,7 @@ pub(crate) fn default_handle_control_message( // port (in case of error messages) let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = message.sender_comp_id; + port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) let port_info = comp_ctx.get_port(port_handle); let peer_comp_id = port_info.peer_comp_id; @@ -425,7 +426,6 @@ pub(crate) fn default_handle_control_message( // that if we have a successful sync round, followed by the peer // closing the port, that we don't consider the sync round to // have failed by mistake. - let error_due_to_port_use = if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { return Err(( last_instruction, @@ -546,7 +546,7 @@ pub(crate) fn default_handle_busy_exit( /// 2. The component has encountered an error during a sync round and is /// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( - sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, + sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { let success = match decision { @@ -568,6 +568,12 @@ pub(crate) fn default_handle_sync_decision( if success { // We cannot get a success message if the component has encountered an // error. + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.close_at_sync_end { + port_info.state = PortState::Closed; + } + } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; return Some(true);