diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index cddd244fa3f52157eb6ff24e617dd061daf02408..e47f44029bfdc991f9b827a18734ad3971c3a7d9 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -285,6 +285,8 @@ pub(crate) fn default_handle_incoming_data_message( ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); + sched_ctx.log("DEBUG: Setting received_message_for_sync"); + comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; @@ -368,7 +370,8 @@ pub(crate) fn default_attempt_get( } } else { // We don't have a message waiting for us. - let port_info = comp_ctx.get_port(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = target_port_instruction; let port_is_closed = port_info.state == PortState::Closed; if port_is_closed { let peer_id = port_info.peer_comp_id; @@ -499,11 +502,13 @@ pub(crate) fn default_handle_control_message( // now it is in the `BlockedGet` state. let port_was_used = last_instruction != PortInstruction::None; + sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message)); + if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - // TODO: Finish this + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; - if closed_during_sync_round { + if closed_during_sync_round || closed_before_sync_round { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) @@ -618,7 +623,7 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed { + if port.state == PortState::Closed || port.close_at_sync_end { // Already closed, or in the process of being closed continue; }