diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index e47f44029bfdc991f9b827a18734ad3971c3a7d9..09528f8b6221097aba72eccbf3283e326cdf00cc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -285,7 +285,6 @@ pub(crate) fn default_handle_incoming_data_message( ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - sched_ctx.log("DEBUG: Setting received_message_for_sync"); comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; @@ -345,6 +344,18 @@ pub(crate) fn default_attempt_get( let port_handle = comp_ctx.get_port_handle(target_port); let port_index = comp_ctx.get_port_index(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = target_port_instruction; + + let port_is_closed = port_info.state == PortState::Closed; + if port_is_closed { + let peer_id = port_info.peer_comp_id; + return GetResult::Error(( + target_port_instruction, + format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0) + )); + } + if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message @@ -369,19 +380,8 @@ pub(crate) fn default_attempt_get( ))); } } else { - // We don't have a message waiting for us. - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = target_port_instruction; - let port_is_closed = port_info.state == PortState::Closed; - if port_is_closed { - let peer_id = port_info.peer_comp_id; - return GetResult::Error(( - target_port_instruction, - format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) - )); - } - - // No error ocurred, so enter the BlockedGet state + // We don't have a message waiting for us and the port is not blocked. + // So enter the BlockedGet state exec_state.set_as_blocked_get(target_port); return GetResult::NoMessage; } @@ -403,14 +403,8 @@ pub(crate) fn default_handle_received_data_message( // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = port_instruction; - - if port_info.state == PortState::Closed { - return Err(( - port_info.last_instruction, - format!("Cannot 'get' because the channel is closed")) - ); - } + debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller + debug_assert_ne!(port_info.state, PortState::Closed); // checked by caller // Check if there are any more messages in the backup buffer let port_info = comp_ctx.get_port(port_handle); @@ -502,8 +496,6 @@ pub(crate) fn default_handle_control_message( // now it is in the `BlockedGet` state. let port_was_used = last_instruction != PortInstruction::None; - sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message)); - if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; @@ -609,7 +601,7 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); - sched_ctx.log("Component starting exit"); + sched_ctx.log(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync();