diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 782399c46374828d05bb0b6074b2511e7de8a519..53c362b7058caf4c05f741de40eb19604767d410 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -577,7 +577,7 @@ pub(crate) fn default_handle_control_message( if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; if closed_during_sync_round || closed_before_sync_round { return Err(( @@ -690,6 +690,13 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.state.is_blocked() { + return CompScheduling::Sleep; + } + } + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); @@ -704,7 +711,8 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state.is_closed() || port.close_at_sync_end { + println!("DEBUG: Considering port:\n{:?}", port); + if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } @@ -904,7 +912,7 @@ fn prepare_send_message_with_ports( /// all stored in the component's execution state by the /// `prepare_send_message_with_ports` function. Port must be ready to send! fn perform_send_message_with_ports( - exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus, + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady); @@ -941,7 +949,9 @@ fn perform_send_message_with_ports( peer_port: transmit_port_info.peer_port_id, kind: transmit_port_info.kind, state: transmit_port_state - }) + }); + + comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None); } // And finally, send the message to the peer