diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 15f4cac2faa53c69514a2da1fc15064197ad6c27..b7ad2c40b907deac5b614da1e0cba99181993434 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -124,10 +124,8 @@ impl ControlLayer { // If a closed port is Ack'd, then we remove the reference to // that component. let port_handle = comp_ctx.get_port_handle(closed_port); - let port_info = comp_ctx.get_port(port_handle); - let port_peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Closed); - comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed + debug_assert!(comp_ctx.get_port(port_handle).state.is_closed()); + comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); } @@ -191,7 +189,7 @@ impl ControlLayer { id: entry_id, sender_comp_id: creator_comp_id, target_port_id: Some(source_port_id), - content: ControlMessageContent::PortPeerChangedBlock(source_port_id) + content: ControlMessageContent::PortPeerChangedBlock }) } @@ -215,10 +213,10 @@ impl ControlLayer { /// Initiates the control message procedures for closing a port. Caller must /// make sure that the port state has already been set to `Closed`. - pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { + pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { let port = comp_ctx.get_port(port_handle); let peer_port_id = port.peer_port_id; - debug_assert!(port.state == PortState::Closed); + debug_assert!(port.state.is_closed()); // Construct the port-closing entry let entry_id = self.take_id(); @@ -236,7 +234,9 @@ impl ControlLayer { id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), - content: ControlMessageContent::ClosePort(peer_port_id), + content: ControlMessageContent::ClosePort(ControlMessageClosePort{ + closed_in_sync_round: exit_inside_sync, + }), } ); } @@ -247,7 +247,7 @@ impl ControlLayer { pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block - debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; @@ -258,8 +258,8 @@ impl ControlLayer { ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, - target_port_id: Some(port_info.peer_port_id), - content: ControlMessageContent::BlockPort(peer_port_id), + target_port_id: Some(peer_port_id), + content: ControlMessageContent::BlockPort, } ); } @@ -269,7 +269,6 @@ impl ControlLayer { pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking - debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -279,7 +278,7 @@ impl ControlLayer { id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), - content: ControlMessageContent::UnblockPort(port_info.peer_port_id) + content: ControlMessageContent::UnblockPort, } ); }