From 4a6883c04294ce15ed39e089cb5a5de717918663 2022-04-25 14:37:21 From: mh Date: 2022-04-25 14:37:21 Subject: [PATCH] Fix bug related to checking for closed port --- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index e47f44029bfdc991f9b827a18734ad3971c3a7d9..09528f8b6221097aba72eccbf3283e326cdf00cc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -285,7 +285,6 @@ pub(crate) fn default_handle_incoming_data_message( ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - sched_ctx.log("DEBUG: Setting received_message_for_sync"); comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; @@ -345,6 +344,18 @@ pub(crate) fn default_attempt_get( let port_handle = comp_ctx.get_port_handle(target_port); let port_index = comp_ctx.get_port_index(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = target_port_instruction; + + let port_is_closed = port_info.state == PortState::Closed; + if port_is_closed { + let peer_id = port_info.peer_comp_id; + return GetResult::Error(( + target_port_instruction, + format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0) + )); + } + if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message @@ -369,19 +380,8 @@ pub(crate) fn default_attempt_get( ))); } } else { - // We don't have a message waiting for us. - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = target_port_instruction; - let port_is_closed = port_info.state == PortState::Closed; - if port_is_closed { - let peer_id = port_info.peer_comp_id; - return GetResult::Error(( - target_port_instruction, - format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) - )); - } - - // No error ocurred, so enter the BlockedGet state + // We don't have a message waiting for us and the port is not blocked. + // So enter the BlockedGet state exec_state.set_as_blocked_get(target_port); return GetResult::NoMessage; } @@ -403,14 +403,8 @@ pub(crate) fn default_handle_received_data_message( // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = port_instruction; - - if port_info.state == PortState::Closed { - return Err(( - port_info.last_instruction, - format!("Cannot 'get' because the channel is closed")) - ); - } + debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller + debug_assert_ne!(port_info.state, PortState::Closed); // checked by caller // Check if there are any more messages in the backup buffer let port_info = comp_ctx.get_port(port_handle); @@ -502,8 +496,6 @@ pub(crate) fn default_handle_control_message( // now it is in the `BlockedGet` state. let port_was_used = last_instruction != PortInstruction::None; - sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message)); - if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; @@ -609,7 +601,7 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); - sched_ctx.log("Component starting exit"); + sched_ctx.log(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index f38875e2ea0a113a9f4d5dcda5cc35f60ca5ae43..123fb16ec9f429712a6b766f9f0c5364fb7a09a4 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -481,45 +481,6 @@ impl CompPDL { return Ok(step_result) } - fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component starting sync mode"); - self.consensus.notify_sync_start(comp_ctx); - for message in self.inbox_main.iter() { - if let Some(message) = message { - self.consensus.handle_incoming_data_message(comp_ctx, message); - } - } - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.exec_state.mode = CompMode::Sync; - } - - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason)); - debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); - self.exec_state.mode = CompMode::BusyExit; - let exit_inside_sync = self.exec_state.exit_reason.is_in_sync(); - - // Doing this by index, then retrieving the handle is a bit rediculous, - // but Rust is being Rust with its borrowing rules. - for port_index in 0..comp_ctx.num_ports() { - let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed { - // Already closed, or in the process of being closed - continue; - } - - // Mark as closed - let port_id = port.self_id; - port.state = PortState::Closed; - - // Notify peer of closing - let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); - let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); - } - } - // ------------------------------------------------------------------------- // Handling messages // -------------------------------------------------------------------------