diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index c21c9e81c9cc1bc355fc8af2b0a3687a92d5cc0f..15f4cac2faa53c69514a2da1fc15064197ad6c27 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -24,7 +24,6 @@ struct ControlEntry { enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), - BlockedPort(PortId), ClosedPort(PortId), } @@ -121,7 +120,6 @@ impl ControlLayer { // schedule the component! return (AckAction::ScheduleComponent(to_schedule), None); }, - ControlContent::BlockedPort(_) => unreachable!(), ControlContent::ClosedPort(closed_port) => { // If a closed port is Ack'd, then we remove the reference to // that component. @@ -243,29 +241,22 @@ impl ControlLayer { ); } - /// Adds a control entry to track that a port is blocked. Expects the caller - /// to have set the port's state to blocking already. The returned tuple - /// contains a message and the peer to send it to. + /// Generates the control message used to indicate to a peer that a port + /// should be blocked (expects the caller to have set the port's state to + /// blocked). pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block - debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller + debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - let entry_id = self.take_id(); - self.entries.push(ControlEntry{ - id: entry_id, - ack_countdown: 0, - content: ControlContent::BlockedPort(port_info.self_id), - }); - return ( peer_handle, ControlMessage{ - id: entry_id, + id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::BlockPort(peer_port_id), @@ -273,32 +264,19 @@ impl ControlLayer { ); } - /// Removes the control entry that tracks that a port is blocked. Expects - /// the caller to have already marked the port as unblocked. Again the - /// returned tuple contains a message and the target it is intended for + /// Generates a messages used to indicate to a peer that a port should be + /// unblocked again. pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before - let position = self.entries.iter() - .position(|v| { - if let ControlContent::BlockedPort(blocked_port_id) = &v.content { - if *blocked_port_id == port_info.self_id { - return true; - } - } - return false; - }) - .unwrap(); - - let entry = self.entries.remove(position); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); return ( peer_handle, ControlMessage{ - id: entry.id, + id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::UnblockPort(port_info.peer_port_id)