use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) struct ControlId(u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is /// just set at u32::MAX to hopefully bring out bugs sooner. fn new_invalid() -> Self { return ControlId(u32::MAX); } } struct ControlEntry { id: ControlId, ack_countdown: u32, content: ControlContent, } enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), BlockedPort(PortId), ClosedPort(PortId), } struct ContentPeerChange { source_port: PortId, source_comp: CompId, target_port: PortId, new_target_comp: CompId, schedule_entry_id: ControlId, } pub(crate) enum AckAction { None, SendMessageAndAck(CompId, ControlMessage, ControlId), ScheduleComponent(CompId), } /// Handling/sending control messages. pub(crate) struct ControlLayer { id_counter: ControlId, entries: Vec, } impl ControlLayer { pub(crate) fn should_reroute(&self, message: &Message) -> Option { // Safety note: rerouting should occur during the time when we're // notifying a peer of a new component. During this period that // component hasn't been executed yet, so cannot have died yet. // FIX @NoDirectHandle let target_port = message.target_port(); if target_port.is_none() { return None; } let target_port = target_port.unwrap(); for entry in &self.entries { if let ControlContent::PeerChange(entry) = &entry.content { if entry.target_port == target_port { return Some(entry.new_target_comp); } } } return None; } pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { let entry_index = self.get_entry_index(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); entry.ack_countdown -= 1; if entry.ack_countdown != 0 { return AckAction::None; } // All `Ack`s received, take action based on the kind of entry match &entry.content { ControlContent::PeerChange(content) => { // If change of peer is ack'd. Then we are certain we have // rerouted all of the messages, and the sender's port can now // be unblocked again. let target_comp_id = content.source_comp; let message_to_send = ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(content.source_port), content: ControlMessageContent::PortPeerChangedUnblock( content.source_port, content.new_target_comp ) }; let to_ack = content.schedule_entry_id; self.entries.remove(entry_index); self.handle_ack(to_ack, sched_ctx, comp_ctx); return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); }, ControlContent::ScheduleComponent(to_schedule) => { // If all change-of-peers are `Ack`d, then we're ready to // schedule the component! return AckAction::ScheduleComponent(*to_schedule); }, ControlContent::BlockedPort(_) => unreachable!(), ControlContent::ClosedPort(port_id) => { // If a closed port is Ack'd, then we remove the reference to // that component. let port_index = comp_ctx.get_port_index(*port_id).unwrap(); debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked); let peer_id = comp_ctx.ports[port_index].peer_comp_id; let peer_index = comp_ctx.get_peer_index(peer_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; if peer_info.num_associated_ports == 0 { let should_remove = peer_info.handle.decrement_users(); if should_remove { let comp_key = unsafe{ peer_info.id.upgrade() }; sched_ctx.runtime.destroy_component(comp_key); } comp_ctx.peers.remove(peer_index); } return AckAction::None; } } } // ------------------------------------------------------------------------- // Port transfer (due to component creation) // ------------------------------------------------------------------------- /// Adds an entry that, when completely ack'd, will schedule a component. pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, // incremented by calls to `add_reroute_entry` content: ControlContent::ScheduleComponent(to_schedule_id), }); return entry_id; } /// Removes a schedule entry. Only used if the caller preemptively called /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, /// hence the `ack_countdown` in the scheduling entry is at 0. pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { let index = self.get_entry_index(schedule_entry_id).unwrap(); debug_assert_eq!(self.entries[index].ack_countdown, 0); self.entries.remove(index); } pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, target_port_id: PortId, new_comp_id: CompId, schedule_entry_id: ControlId, ) -> Message { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, content: ControlContent::PeerChange(ContentPeerChange{ source_port: source_port_id, source_comp: source_comp_id, target_port: target_port_id, new_target_comp: new_comp_id, schedule_entry_id, }), }); // increment counter on schedule entry for entry in &mut self.entries { if entry.id == schedule_entry_id { entry.ack_countdown += 1; break; } } return Message::Control(ControlMessage{ id: entry_id, sender_comp_id: creator_comp_id, target_port_id: Some(source_port_id), content: ControlMessageContent::PortPeerChangedBlock(source_port_id) }) } // ------------------------------------------------------------------------- // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { let port = comp_ctx.get_port_mut(port_id); let peer_port_id = port.peer_id; let peer_comp_id = port.peer_comp_id; debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); port.state = PortState::Closed; if peer_comp_id == comp_ctx.id { // We own the other end of the channel as well. return None; } let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, content: ControlContent::ClosedPort(port_id), }); return Some(( peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(peer_port_id), } )); } pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { // TODO: Feels like this shouldn't be an entry. Hence this class should // be renamed. Lets see where the code ends up being let entry_id = self.take_id(); let port_info = comp_ctx.get_port_mut(port_id); let peer_port_id = port_info.peer_id; let peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues port_info.state = PortState::Blocked; self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, content: ControlContent::BlockedPort(port_id), }); return ( peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::BlockPort(peer_port_id), } ); } pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { // Find the entry that contains the blocking entry for the port let mut entry_index = usize::MAX; let mut entry_id = ControlId::new_invalid(); for (index, entry) in self.entries.iter().enumerate() { if let ControlContent::BlockedPort(blocked_port) = &entry.content { if *blocked_port == port_id { entry_index = index; entry_id = entry.id; break; } } } let port_info = comp_ctx.get_port_mut(port_id); let peer_port_id = port_info.peer_id; let peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Blocked); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages port_info.state = PortState::Open; return ( peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::UnblockPort(peer_port_id), } ) } // ------------------------------------------------------------------------- // Internal utilities // ------------------------------------------------------------------------- fn take_id(&mut self) -> ControlId { let id = self.id_counter; self.id_counter.0 = self.id_counter.0.wrapping_add(1); return id; } fn get_entry_index(&self, entry_id: ControlId) -> Option { for (index, entry) in self.entries.iter().enumerate() { if entry.id == entry_id { return Some(index); } } return None; } } impl Default for ControlLayer { fn default() -> Self { return ControlLayer{ id_counter: ControlId(0), entries: Vec::new(), } } }