diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 67dd569364fdf0803e7cbd6e13012cdac1b22975..d62e12376dc8a3aad288ca6dd4688da39fd99750 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -278,6 +278,7 @@ pub(crate) fn default_send_data_message( let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; + port_info.last_registered_round = Some(consensus.round_number()); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -511,9 +512,10 @@ pub(crate) fn default_handle_received_data_message( } // Modify last-known location where port instruction was retrieved - let port_info = comp_ctx.get_port(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller debug_assert!(port_info.state.is_open()); // checked by caller + port_info.last_registered_round = Some(message.sync_header.sync_round); // Check if there are any more messages in the backup buffer for message_index in 0..inbox_backup.len() { @@ -591,6 +593,7 @@ pub(crate) fn default_handle_control_message( } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); + let last_registered_round = port_info.last_registered_round; let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); @@ -607,10 +610,11 @@ pub(crate) fn default_handle_control_message( let port_was_used = last_instruction != PortInstruction::None; if exec_state.mode.is_in_sync_block() { - let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; + let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round; + let closed_during_sync_round = content.closed_in_sync_round; + let closed_before_sync_round = ! closed_during_sync_round && !round_has_succeeded; - if closed_during_sync_round || closed_before_sync_round { + if (closed_during_sync_round || closed_before_sync_round) && port_was_used { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)