diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f5e74559dca3ec8e1c6e26f10e21079df43d5b7c..5e67b69b010abe717230e2aaa1abafa39c2a9a48 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -278,6 +278,7 @@ pub(crate) fn default_send_data_message( let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; + port_info.last_registered_round = Some(consensus.round_number()); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); @@ -334,7 +335,6 @@ pub(crate) fn default_handle_incoming_data_message( ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; @@ -452,15 +452,6 @@ pub(crate) fn default_handle_received_data_message( // Then notify the peers that they can continue sending to this port, but // now at a new address. for received_port in &mut message.ports { - // Transfer messages to main/backup inbox - let _new_inbox_index = inbox_main.len(); - if !received_port.messages.is_empty() { - inbox_main.push(Some(received_port.messages.remove(0))); - inbox_backup.extend(received_port.messages.drain(..)); - } else { - inbox_main.push(None); - } - // Create a new port locally let mut new_port_state = received_port.state; new_port_state.set(PortStateFlag::Received); @@ -468,10 +459,25 @@ pub(crate) fn default_handle_received_data_message( received_port.peer_comp, received_port.peer_port, received_port.kind, new_port_state ); - debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); + // Transfer messages to main/backup inbox. Make sure to modify the + // target port to our own + for message in received_port.messages.iter_mut() { + message.data_header.target_port = new_port.self_id; + } + + let _new_inbox_index = inbox_main.len(); + if !received_port.messages.is_empty() { + inbox_main.push(Some(received_port.messages.remove(0))); + inbox_backup.extend(received_port.messages.drain(..)); + } else { + inbox_main.push(None); + } + + debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); + // Add the port tho the consensus consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx); @@ -505,9 +511,10 @@ pub(crate) fn default_handle_received_data_message( } // Modify last-known location where port instruction was retrieved - let port_info = comp_ctx.get_port(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller debug_assert!(port_info.state.is_open()); // checked by caller + port_info.last_registered_round = Some(message.sync_header.sync_round); // Check if there are any more messages in the backup buffer for message_index in 0..inbox_backup.len() { @@ -585,8 +592,8 @@ pub(crate) fn default_handle_control_message( } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); + let last_registered_round = port_info.last_registered_round; let last_instruction = port_info.last_instruction; - let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.change_port_peer(sched_ctx, port_handle, None); @@ -601,10 +608,11 @@ pub(crate) fn default_handle_control_message( let port_was_used = last_instruction != PortInstruction::None; if exec_state.mode.is_in_sync_block() { - let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; + let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round; + let closed_during_sync_round = content.closed_in_sync_round; + let closed_before_sync_round = !closed_during_sync_round && !round_has_succeeded; - if closed_during_sync_round || closed_before_sync_round { + if (closed_during_sync_round || closed_before_sync_round) && port_was_used { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) @@ -682,7 +690,6 @@ pub(crate) fn default_handle_sync_start( if let Some(message) = message { consensus.handle_incoming_data_message(comp_ctx, message); let port_info = comp_ctx.get_port_by_index_mut(port_index); - port_info.received_message_for_sync = true; } }