diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 0b03aa14729445e04e7c8faf891b13aae1d111e8..d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -412,7 +412,7 @@ pub(crate) fn default_attempt_get( match default_handle_received_data_message( target_port, target_port_instruction, &mut message, inbox_main, inbox_backup, - comp_ctx, sched_ctx, control, + comp_ctx, sched_ctx, control, consensus ) { Ok(()) => return GetResult::Received(message), Err(location_and_message) => return GetResult::Error(location_and_message) @@ -440,7 +440,8 @@ pub(crate) fn default_attempt_get( pub(crate) fn default_handle_received_data_message( targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, - comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer, + consensus: &mut Consensus ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); @@ -470,6 +471,9 @@ pub(crate) fn default_handle_received_data_message( comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); + // Add the port tho the consensus + consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { let value = message.content.get_value_mut(message_location);