diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 75bd7c1f096fa10c0b355d0c4ea5577bab7eedcd..67b481c073e8c16a09533a7711d9864bab934326 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -2,51 +2,270 @@ use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use crate::runtime2::component::*; +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub(crate) struct ControlId(u32); + +impl ControlId { + /// Like other invalid IDs, this one doesn't care any significance, but is + /// just set at u32::MAX to hopefully bring out bugs sooner. + fn new_invalid() -> Self { + return ControlId(u32::MAX); + } +} + struct ControlEntry { - id: u32, + id: ControlId, ack_countdown: u32, content: ControlContent, - ack_action: ControlAction, } enum ControlContent { - PeerChange(ControlPeerChange), + PeerChange(ContentPeerChange), + ScheduleComponent(ContentScheduleComponent), + BlockedPort(ContentBlockedPort), } -struct ControlPeerChange { +struct ContentPeerChange { source_port: PortId, - target_port: PortId, // if sent to this port - new_target_comp: CompId, // redirect to this component + source_comp: CompId, + target_port: PortId, + new_target_comp: CompId, + schedule_entry_id: ControlId, +} + +struct ContentScheduleComponent { + to_schedule: CompId, } -/// Action to be taken when the `Ack`s for a control entry has come in. -enum ControlAction { - Nothing, - AckOwnEntry(u32), // ack an entry we own ourselves - ScheduleComponent(CompId), // schedule a particular component for execution +struct ContentBlockedPort { + blocked_port: PortId, +} + +pub(crate) enum AckAction { + None, + SendMessageAndAck(CompId, ControlMessage, ControlId), + ScheduleComponent(CompId), } /// Handling/sending control messages. pub(crate) struct ControlLayer { - id_counter: u32, + id_counter: ControlId, entries: Vec, } impl ControlLayer { - fn handle_created_component(&mut self, creator_ctx: &CompCtx, created_ctx: &CompCtx) { - for peer in &created_ctx.peers { - // TODO: Optimize when we ourselves are the peer. + pub(crate) fn should_reroute(&self, message: &Message) -> Option { + // Safety note: rerouting should occur during the time when we're + // notifying a peer of a new component. During this period that + // component hasn't been executed yet, so cannot have died yet. + // FIX @NoDirectHandle + let target_port = message.target_port(); + if target_port.is_none() { + return None; + } + + let target_port = target_port.unwrap(); + for entry in &self.entries { + if let ControlContent::PeerChange(entry) = &entry.content { + if entry.target_port == target_port { + return Some(entry.new_target_comp); + } + } + } + + return None; + } + + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction { + let entry_index = self.get_entry_index(entry_id).unwrap(); + let entry = &mut self.entries[entry_index]; + debug_assert!(entry.ack_countdown > 0); + + entry.ack_countdown -= 1; + if entry.ack_countdown != 0 { + return AckAction::None; + } + + // All `Ack`s received, take action based on the kind of entry + match &entry.content { + ControlContent::PeerChange(content) => { + // If change of peer is ack'd. Then we are certain we have + // rerouted all of the messages, and the sender's port can now + // be unblocked again. + let target_comp_id = content.source_comp; + let message_to_send = ControlMessage{ + id: ControlId::new_invalid(), + sender_comp_id: comp_ctx.id, + target_port_id: Some(content.source_port), + content: ControlMessageContent::PortPeerChangedUnblock( + content.source_port, + content.new_target_comp + ) + }; + let to_ack = content.schedule_entry_id; - // Create entry that will unblock the peer if it confirms that all - // of its ports have been blocked + self.entries.remove(entry_index); + self.handle_ack(to_ack, comp_ctx); - peer.handle.inbox.push(Message::) + return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); + }, + ControlContent::ScheduleComponent(content) => { + // If all change-of-peers are `Ack`d, then we're ready to + // schedule the component! + return AckAction::ScheduleComponent(content.to_schedule); + }, + ControlContent::BlockedPort(_) => unreachable!(), } } - fn take_id(&mut self) -> u32 { + // ------------------------------------------------------------------------- + // Port transfer (due to component creation) + // ------------------------------------------------------------------------- + + /// Adds an entry that, when completely ack'd, will schedule a component. + pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, // incremented by calls to `add_reroute_entry` + content: ControlContent::ScheduleComponent(ContentScheduleComponent{ + to_schedule: to_schedule_id + }), + }); + + return entry_id; + } + + /// Removes a schedule entry. Only used if the caller preemptively called + /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, + /// hence the `ack_countdown` in the scheduling entry is at 0. + pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { + let index = self.get_entry_index(schedule_entry_id).unwrap(); + debug_assert_eq!(self.entries[index].ack_countdown, 0); + self.entries.remove(index); + } + + pub(crate) fn add_reroute_entry( + &mut self, creator_comp_id: CompId, + source_port_id: PortId, source_comp_id: CompId, + target_port_id: PortId, new_comp_id: CompId, + schedule_entry_id: ControlId, + ) -> Message { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 1, + content: ControlContent::PeerChange(ContentPeerChange{ + source_port: source_port_id, + source_comp: source_comp_id, + target_port: target_port_id, + new_target_comp: new_comp_id, + schedule_entry_id, + }), + }); + + // increment counter on schedule entry + for entry in &mut self.entries { + if entry.id == schedule_entry_id { + entry.ack_countdown += 1; + break; + } + } + + return Message::Control(ControlMessage{ + id: entry_id, + sender_comp_id: creator_comp_id, + target_port_id: Some(source_port_id), + content: ControlMessageContent::PortPeerChangedBlock(source_port_id) + }) + } + + // ------------------------------------------------------------------------- + // Blocking and unblocking ports + // ------------------------------------------------------------------------- + + pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { + // TODO: Feels like this shouldn't be an entry. Hence this class should + // be renamed. Lets see where the code ends up being + let entry_id = self.take_id(); + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues + port_info.state = PortState::Blocked; + + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, + content: ControlContent::BlockedPort(ContentBlockedPort{ + blocked_port: port_id, + }), + }); + + return ( + port_info.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port_info.peer_id), + content: ControlMessageContent::BlockPort(port_info.peer_id), + } + ); + } + + pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { + // Find the entry that contains the blocking entry for the port + let mut entry_index = usize::MAX; + let mut entry_id = ControlId::MAX; + for (index, entry) in self.entries.iter().enumerate() { + if let ControlContent::BlockedPort(block_entry) = &entry.content { + if block_entry.blocked_port == port_id { + entry_index = index; + entry_id = entry.id; + break; + } + } + } + + let port_info = comp_ctx.get_port_mut(port_id); + debug_assert_eq!(port_info.state, PortState::Blocked); + port_info.state = PortState::Open; + + return ( + port_info.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port_info.peer_id), + content: ControlMessageContent::UnblockPort(port_info.peer_id), + } + ) + } + + // ------------------------------------------------------------------------- + // Internal utilities + // ------------------------------------------------------------------------- + + fn take_id(&mut self) -> ControlId { let id = self.id_counter; - self.id_counter += 1; + self.id_counter.0 = self.id_counter.0.wrapping_add(1); return id; } + + fn get_entry_index(&self, entry_id: ControlId) -> Option { + for (index, entry) in self.entries.iter().enumerate() { + if entry.id == entry_id { + return Some(index); + } + } + + return None; + } +} + +impl Default for ControlLayer { + fn default() -> Self { + return ControlLayer{ + id_counter: ControlId(0), + entries: Vec::new(), + } + } } \ No newline at end of file