diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 86d9e8f1f040229c5a56a9336370a13ee9327aa6..c026ef88f9d3836dd87fc8795230c68bbc27d343 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -2,6 +2,8 @@ use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; +use super::component_context::*; + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) struct ControlId(u32); @@ -72,7 +74,7 @@ impl ControlLayer { } pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { - let entry_index = self.get_entry_index(entry_id).unwrap(); + let entry_index = self.get_entry_index_by_id(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); @@ -111,22 +113,10 @@ impl ControlLayer { ControlContent::ClosedPort(port_id) => { // If a closed port is Ack'd, then we remove the reference to // that component. - let port_index = comp_ctx.get_port_index(*port_id).unwrap(); - debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked); - let peer_id = comp_ctx.ports[port_index].peer_comp_id; - let peer_index = comp_ctx.get_peer_index(peer_id).unwrap(); - let peer_info = &mut comp_ctx.peers[peer_index]; - peer_info.num_associated_ports -= 1; - - if peer_info.num_associated_ports == 0 { - let should_remove = peer_info.handle.decrement_users(); - if should_remove { - let comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } - - comp_ctx.peers.remove(peer_index); - } + let port_handle = comp_ctx.get_port_handle(*port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.state, PortState::Closed); + comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id); return AckAction::None; } @@ -153,7 +143,7 @@ impl ControlLayer { /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, /// hence the `ack_countdown` in the scheduling entry is at 0. pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { - let index = self.get_entry_index(schedule_entry_id).unwrap(); + let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); debug_assert_eq!(self.entries[index].ack_countdown, 0); self.entries.remove(index); } @@ -198,9 +188,10 @@ impl ControlLayer { // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- - pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { - let port = comp_ctx.get_port_mut(port_id); - let peer_port_id = port.peer_id; + pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { + let port = comp_ctx.get_port_mut(port_handle); + let port_id = port.self_id; + let peer_port_id = port.peer_port_id; let peer_comp_id = port.peer_comp_id; debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); @@ -229,63 +220,67 @@ impl ControlLayer { )); } - pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { - // TODO: Feels like this shouldn't be an entry. Hence this class should - // be renamed. Lets see where the code ends up being - let entry_id = self.take_id(); - let port_info = comp_ctx.get_port_mut(port_id); - let peer_port_id = port_info.peer_id; + /// Adds a control entry to track that a port is blocked. Expects the caller + /// to have set the port's state to blocking already. The returned tuple + /// contains a message and the peer to send it to. + pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block + debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller + + let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues - port_info.state = PortState::Blocked; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, - content: ControlContent::BlockedPort(port_id), + content: ControlContent::BlockedPort(port_info.self_id), }); return ( - peer_comp_id, + peer_handle, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, - target_port_id: Some(peer_port_id), + target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::BlockPort(peer_port_id), } ); } - pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { - // Find the entry that contains the blocking entry for the port - let mut entry_index = usize::MAX; - let mut entry_id = ControlId::new_invalid(); - for (index, entry) in self.entries.iter().enumerate() { - if let ControlContent::BlockedPort(blocked_port) = &entry.content { - if *blocked_port == port_id { - entry_index = index; - entry_id = entry.id; - break; + /// Removes the control entry that tracks that a port is blocked. Expects + /// the caller to have already marked the port as unblocked. Again the + /// returned tuple contains a message and the target it is intended for + pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking + debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before + + let position = self.entries.iter() + .position(|v| { + if let ControlContent::BlockedPort(blocked_port_id) = &v.content { + if *blocked_port_id == port_info.self_id { + return true; + } } - } - } + return false; + }) + .unwrap(); - let port_info = comp_ctx.get_port_mut(port_id); - let peer_port_id = port_info.peer_id; - let peer_comp_id = port_info.peer_comp_id; - debug_assert_eq!(port_info.state, PortState::Blocked); - debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages - port_info.state = PortState::Open; + let entry = self.entries.remove(position); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); return ( - peer_comp_id, + peer_handle, ControlMessage{ - id: entry_id, + id: entry.id, sender_comp_id: comp_ctx.id, - target_port_id: Some(peer_port_id), - content: ControlMessageContent::UnblockPort(peer_port_id), + target_port_id: Some(port_info.peer_port_id), + content: ControlMessageContent::UnblockPort(port_info.peer_port_id) } - ) + ); } // ------------------------------------------------------------------------- @@ -298,7 +293,7 @@ impl ControlLayer { return id; } - fn get_entry_index(&self, entry_id: ControlId) -> Option { + fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option { for (index, entry) in self.entries.iter().enumerate() { if entry.id == entry_id { return Some(index);