diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 803db6e48ef4c9ef920d8ffc26b4a86a2d13ad86..6040e234106bbb37dc09f30569661b9890803fcc 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -177,6 +177,7 @@ pub enum ControlMessageContent { #[derive(Copy, Clone, Debug)] pub struct ControlMessageClosePort { pub closed_in_sync_round: bool, // needed to ensure correct handling of errors + pub registered_round: Option, } // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 67dd569364fdf0803e7cbd6e13012cdac1b22975..d62e12376dc8a3aad288ca6dd4688da39fd99750 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -278,6 +278,7 @@ pub(crate) fn default_send_data_message( let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; + port_info.last_registered_round = Some(consensus.round_number()); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -511,9 +512,10 @@ pub(crate) fn default_handle_received_data_message( } // Modify last-known location where port instruction was retrieved - let port_info = comp_ctx.get_port(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller debug_assert!(port_info.state.is_open()); // checked by caller + port_info.last_registered_round = Some(message.sync_header.sync_round); // Check if there are any more messages in the backup buffer for message_index in 0..inbox_backup.len() { @@ -591,6 +593,7 @@ pub(crate) fn default_handle_control_message( } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); + let last_registered_round = port_info.last_registered_round; let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); @@ -607,10 +610,11 @@ pub(crate) fn default_handle_control_message( let port_was_used = last_instruction != PortInstruction::None; if exec_state.mode.is_in_sync_block() { - let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; + let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round; + let closed_during_sync_round = content.closed_in_sync_round; + let closed_before_sync_round = ! closed_during_sync_round && !round_has_succeeded; - if closed_during_sync_round || closed_before_sync_round { + if (closed_during_sync_round || closed_before_sync_round) && port_was_used { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 1d00c2c83fa9b0ae269fe690f659ef94cf4c9e1d..8bf9189d82ed212972b3acfc4bf5aeecf3352fbe 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -129,6 +129,7 @@ pub struct Port { pub kind: PortKind, pub state: PortState, // State tracking for error detection and error handling + pub last_registered_round: Option, pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message @@ -181,6 +182,7 @@ impl CompCtx { kind: PortKind::Putter, state: PortState::new(), peer_comp_id: self.id, + last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, @@ -192,6 +194,7 @@ impl CompCtx { kind: PortKind::Getter, state: PortState::new(), peer_comp_id: self.id, + last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, @@ -206,6 +209,7 @@ impl CompCtx { let self_id = PortId(self.take_port_id()); self.ports.push(Port{ self_id, peer_comp_id, peer_port_id, kind, state, + last_registered_round: None, last_instruction: PortInstruction::None, close_at_sync_end: false, received_message_for_sync: false, diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index e4706c27995c39138d51ef1629058bcca6d0d004..db9a1e3c366b8118a8d58e038ea8d823741002d7 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -270,6 +270,11 @@ impl Consensus { } } + #[inline] + pub(crate) fn round_number(&self) -> u32 { + return self.round_index; + } + // ------------------------------------------------------------------------- // Managing sync state // ------------------------------------------------------------------------- diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 9eabc0cd5f0534f877c7251c46b1296ac11bfbf4..f3ec7e5150968c3d93f6d187a1ffd1f27689d466 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -264,6 +264,7 @@ impl ControlLayer { target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(ControlMessageClosePort{ closed_in_sync_round: exit_inside_sync, + registered_round: port.last_registered_round, }), } );