diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 57d89d9ba93a959fd7f7c14ebe9ff3d5f0fa7f0c..299c64f158dd53f70f55f17456bcb4a1d7570d8c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -481,7 +481,7 @@ pub(crate) fn default_handle_control_message( let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.change_port_peer(sched_ctx, port_handle, None); // Handle any possible error conditions (which boil down to: the // port has been used, but the peer has died). If not in sync @@ -500,7 +500,7 @@ pub(crate) fn default_handle_control_message( if closed_during_sync_round || closed_before_sync_round { return Err(( last_instruction, - format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) )); } } else { @@ -543,14 +543,11 @@ pub(crate) fn default_handle_control_message( debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let old_peer_id = port_info.peer_comp_id; - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } }