use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; use super::component_context::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) struct ControlId(u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is /// just set at u32::MAX to hopefully bring out bugs sooner. fn new_invalid() -> Self { return ControlId(u32::MAX); } } struct ControlEntry { id: ControlId, ack_countdown: u32, content: ControlContent, } enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), ClosedPort(PortId), UnblockPutWithPorts } struct ContentPeerChange { source_port: PortId, source_comp: CompId, old_target_port: PortId, new_target_port: PortId, new_target_comp: CompId, schedule_entry_id: ControlId, } struct ControlClosedPort { closed_port: PortId, exit_entry_id: Option, } pub(crate) enum AckAction { None, SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), UnblockPutWithPorts, } /// Handling/sending control messages. pub(crate) struct ControlLayer { id_counter: ControlId, entries: Vec, } impl ControlLayer { pub(crate) fn should_reroute(&self, message: &mut Message) -> Option { // Safety note: rerouting should occur during the time when we're // notifying a peer of a new component. During this period that // component hasn't been executed yet, so cannot have died yet. // FIX @NoDirectHandle let target_port = message.target_port(); if target_port.is_none() { return None; } let target_port = target_port.unwrap(); for entry in &self.entries { if let ControlContent::PeerChange(entry) = &entry.content { if entry.old_target_port == target_port { message.modify_target_port(entry.new_target_port); return Some(entry.new_target_comp); } } } return None; } /// Handles an acknowledgement. The returned action must be performed by the /// caller. The optionally returned `ControlId` must be used and passed to /// `handle_ack` again. pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option) { let entry_index = self.get_entry_index_by_id(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); entry.ack_countdown -= 1; if entry.ack_countdown != 0 { return (AckAction::None, None); } let entry = self.entries.remove(entry_index); // All `Ack`s received, take action based on the kind of entry match entry.content { ControlContent::PeerChange(content) => { // If change of peer is ack'd. Then we are certain we have // rerouted all of the messages, and the sender's port can now // be unblocked again. let target_comp_id = content.source_comp; let message_to_send = ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(content.source_port), content: ControlMessageContent::PortPeerChangedUnblock( content.new_target_port, content.new_target_comp ) }; let to_ack = content.schedule_entry_id; return ( AckAction::SendMessage(target_comp_id, message_to_send), Some(to_ack) ); }, ControlContent::ScheduleComponent(to_schedule) => { // If all change-of-peers are `Ack`d, then we're ready to // schedule the component! return (AckAction::ScheduleComponent(to_schedule), None); }, ControlContent::ClosedPort(closed_port) => { // If a closed port is Ack'd, then we remove the reference to // that component. let port_handle = comp_ctx.get_port_handle(closed_port); debug_assert!(comp_ctx.get_port(port_handle).state.is_closed()); comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); }, ControlContent::UnblockPutWithPorts => { return (AckAction::UnblockPutWithPorts, None); } } } pub(crate) fn has_acks_remaining(&self) -> bool { return !self.entries.is_empty(); } // ------------------------------------------------------------------------- // Port transfer (due to component creation) // ------------------------------------------------------------------------- /// Adds an entry that, when completely ack'd, will schedule a component. pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, // incremented by calls to `add_reroute_entry` content: ControlContent::ScheduleComponent(to_schedule_id), }); return entry_id; } /// Adds an entry that returns the similarly named Ack action pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, // incremented by calls to `add_reroute_entry` content: ControlContent::UnblockPutWithPorts, }); return entry_id; } /// Removes a schedule entry. Only used if the caller preemptively called /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, /// hence the `ack_countdown` in the scheduling entry is at 0. pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); debug_assert_eq!(self.entries[index].ack_countdown, 0); self.entries.remove(index); } pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId, schedule_entry_id: ControlId, ) -> Message { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, content: ControlContent::PeerChange(ContentPeerChange{ source_port: source_port_id, source_comp: source_comp_id, old_target_port: old_target_port_id, new_target_port: new_target_port_id, new_target_comp: new_comp_id, schedule_entry_id, }), }); // increment counter on schedule entry let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); self.entries[entry_index].ack_countdown += 1; return Message::Control(ControlMessage{ id: entry_id, sender_comp_id: creator_comp_id, target_port_id: Some(source_port_id), content: ControlMessageContent::PortPeerChangedBlock }) } // ------------------------------------------------------------------------- // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option { let port = comp_ctx.get_port(port_handle); let port_id = port.self_id; for entry in self.entries.iter() { if let ControlContent::ClosedPort(entry_port_id) = &entry.content { if *entry_port_id == port_id { return Some(entry.id); } } } return None; } /// Initiates the control message procedures for closing a port. Caller must /// make sure that the port state has already been set to `Closed`. pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { let port = comp_ctx.get_port(port_handle); let peer_port_id = port.peer_port_id; debug_assert!(port.state.is_closed()); // Construct the port-closing entry let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, content: ControlContent::ClosedPort(port.self_id), }); // Return the messages notifying peer of the closed port let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id); return ( peer_handle, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(ControlMessageClosePort{ closed_in_sync_round: exit_inside_sync, }), } ); } /// Generates the control message used to indicate to a peer that a port /// should be blocked (expects the caller to have set the port's state to /// blocked). pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); return ( peer_handle, ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::BlockPort, } ); } /// Generates a messages used to indicate to a peer that a port should be /// unblocked again. pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); return ( peer_handle, ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::UnblockPort, } ); } // ------------------------------------------------------------------------- // Internal utilities // ------------------------------------------------------------------------- fn take_id(&mut self) -> ControlId { let id = self.id_counter; self.id_counter.0 = self.id_counter.0.wrapping_add(1); return id; } fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option { for (index, entry) in self.entries.iter().enumerate() { if entry.id == entry_id { return Some(index); } } return None; } } impl Default for ControlLayer { fn default() -> Self { return ControlLayer{ id_counter: ControlId(0), entries: Vec::new(), } } }