diff --git a/src/runtime/scheduler.rs b/src/runtime/scheduler.rs new file mode 100644 index 0000000000000000000000000000000000000000..59ebb2549f631608ed771282d51e6cf1ed83952f --- /dev/null +++ b/src/runtime/scheduler.rs @@ -0,0 +1,935 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::atomic::Ordering; + +use crate::protocol::eval::EvalError; +use crate::runtime::port::ChannelId; + +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; +use super::port::{Port, PortState, PortIdLocal}; +use super::native::Connector; +use super::branch::{BranchId}; +use super::connector::{ConnectorPDL, ConnectorScheduling}; +use super::inbox::{ + Message, DataMessage, + ControlMessage, ControlContent, + SyncControlMessage, SyncControlContent, +}; + +// Because it contains pointers we're going to do a copy by value on this one +#[derive(Clone, Copy)] +pub(crate) struct SchedulerCtx<'a> { + pub(crate) runtime: &'a RuntimeInner +} + +pub(crate) struct Scheduler { + runtime: Arc, + scheduler_id: u32, +} + +impl Scheduler { + pub fn new(runtime: Arc, scheduler_id: u32) -> Self { + return Self{ runtime, scheduler_id }; + } + + pub fn run(&mut self) { + // Setup global storage and workspaces that are reused for every + // connector that we run + 'thread_loop: loop { + // Retrieve a unit of work + self.debug("Waiting for work"); + let connector_key = self.runtime.wait_for_work(); + if connector_key.is_none() { + // We should exit + self.debug(" ... No more work, quitting"); + break 'thread_loop; + } + + // We have something to do + let connector_key = connector_key.unwrap(); + let connector_id = connector_key.downcast(); + self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); + + let scheduled = self.runtime.get_component_private(&connector_key); + + // Keep running until we should no longer immediately schedule the + // connector. + let mut cur_schedule = ConnectorScheduling::Immediate; + while let ConnectorScheduling::Immediate = cur_schedule { + self.handle_inbox_messages(scheduled); + + // Run the main behaviour of the connector, depending on its + // current state. + if scheduled.shutting_down { + // Nothing to do. But we're stil waiting for all our pending + // control messages to be answered. + self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); + if scheduled.router.num_pending_acks() == 0 { + // We're actually done, we can safely destroy the + // currently running connector + self.runtime.initiate_component_destruction(connector_key); + continue 'thread_loop; + } else { + cur_schedule = ConnectorScheduling::NotNow; + } + } else { + self.debug_conn(connector_id, "Running ..."); + let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; + let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); + self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); + + // Handle all of the output from the current run: messages to + // send and connectors to instantiate. + self.handle_changes_in_context(scheduled); + + cur_schedule = new_schedule; + } + } + + // If here then the connector does not require immediate execution. + // So enqueue it if requested, and otherwise put it in a sleeping + // state. + match cur_schedule { + ConnectorScheduling::Immediate => unreachable!(), + ConnectorScheduling::Later => { + // Simply queue it again later + self.runtime.push_work(connector_key); + }, + ConnectorScheduling::NotNow => { + // Need to sleep, note that we are the only ones which are + // allows to set the sleeping state to `true`, and since + // we're running it must currently be `false`. + self.try_go_to_sleep(connector_key, scheduled); + }, + ConnectorScheduling::Exit => { + // Prepare for exit. Set the shutdown flag and broadcast + // messages to notify peers of closing channels + scheduled.shutting_down = true; + for port in &scheduled.ctx.ports { + if port.state != PortState::Closed { + let message = scheduled.router.prepare_closing_channel( + port.self_id, port.peer_id, + connector_id + ); + self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message)); + } + } + + // Any messages still in the public inbox should be handled + scheduled.ctx.inbox.clear_read_messages(); + while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { + let message = scheduled.ctx.take_message_using_ticket(ticket); + self.handle_message_while_shutting_down(message, scheduled); + } + + if scheduled.router.num_pending_acks() == 0 { + // All ports (if any) already closed + self.runtime.initiate_component_destruction(connector_key); + continue 'thread_loop; + } + + self.try_go_to_sleep(connector_key, scheduled); + }, + } + } + } + + /// Receiving messages from the public inbox and handling them or storing + /// them in the component's private inbox + fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx.id; + + while let Some(message) = scheduled.public.inbox.take_message() { + // Check if the message has to be rerouted because we have moved the + // target port to another component. + self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); + if let Some(target_port) = message.target_port() { + if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { + self.debug_conn(connector_id, " ... Rerouting the message"); + + // We insert directly into the private inbox. Since we have + // a reroute entry the component can not yet be running. + if let Message::Control(_) = &message { + self.runtime.send_message_assumed_alive(other_component_id, message); + } else { + let key = unsafe { ConnectorKey::from_id(other_component_id) }; + let component = self.runtime.get_component_private(&key); + component.ctx.inbox.insert_new(message); + } + + continue; + } + + match scheduled.ctx.get_port_by_id(target_port) { + Some(port_info) => { + if port_info.state == PortState::Closed { + // We're no longer supposed to receive messages + // (rerouted message arrived much later!) + continue + } + }, + None => { + // Apparently we no longer have a handle to the port + continue; + } + } + } + + // If here, then we should handle the message + self.debug_conn(connector_id, " ... Handling the message"); + if let Message::Control(message) = &message { + match message.content { + ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); + port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. + debug_assert!(scheduled.ctx.outbox.is_empty()); + + // And respond with an Ack + let ack_message = Message::Control(ControlMessage { + id: message.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); + self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message)); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); + }, + ControlContent::CloseChannel(port_id) => { + // Mark the port as being closed + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); + port.state = PortState::Closed; + + // Send an Ack + let ack_message = Message::Control(ControlMessage { + id: message.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); + self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message)); + self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); + }, + ControlContent::Ack => { + if let Some(component_key) = scheduled.router.handle_ack(message.id) { + self.runtime.push_work(component_key); + }; + }, + ControlContent::Ping => {}, + } + } else { + // Not a control message + if scheduled.shutting_down { + // Since we're shutting down, we just want to respond with a + // message saying the message did not arrive. + debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared + self.handle_message_while_shutting_down(message, scheduled); + } else { + scheduled.ctx.inbox.insert_new(message); + } + } + } + } + + fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) { + let target_port_and_round_number = match message { + Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), + Message::SyncComp(_) => None, + Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), + Message::SyncControl(_) => None, + Message::Control(_) => None, + }; + + if let Some((target_port, sync_round)) = target_port_and_round_number { + // This message is aimed at a port, but we're shutting down, so + // notify the peer that its was not received properly. + // (also: since we're shutting down, we're not in sync mode and + // the context contains the definitive set of owned ports) + let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); + if port.state == PortState::Open { + let message = SyncControlMessage { + in_response_to_sync_round: sync_round, + target_component_id: port.peer_connector, + content: SyncControlContent::ChannelIsClosed(port.peer_id), + }; + self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message)); + } + } + } + + /// Handles changes to the context that were made by the component. This is + /// the way (due to Rust's borrowing rules) that we bubble up changes in the + /// component's state that the scheduler needs to know about (e.g. a message + /// that the component wants to send, a port that has been added). + fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx.id; + + // Handling any messages that were sent + while let Some(message) = scheduled.ctx.outbox.pop_front() { + let (target_component_id, over_port) = match &message { + Message::Data(content) => { + // Data messages are always sent to a particular port, and + // may end up being rerouted. + let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); + debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context + + (port_desc.peer_connector, true) + }, + Message::SyncComp(content) => { + // Sync messages are always sent to a particular component, + // the sender must make sure it actually wants to send to + // the specified component (and is not using an inconsistent + // component ID associated with a port). + (content.target_component_id, false) + }, + Message::SyncPort(content) => { + let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.target_port); + debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context + + (port_desc.peer_connector, true) + }, + Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), + Message::Control(_) => unreachable!("component sending 'Control' messages directly"), + }; + + self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message)); + if over_port { + self.runtime.send_message_assumed_alive(target_component_id, message); + } else { + self.runtime.send_message_maybe_destroyed(target_component_id, message); + } + } + + while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { + match state_change { + ComponentStateChange::CreatedComponent(component, initial_ports) => { + // Creating a new component. Need to relinquish control of + // the ports. + let new_component_key = self.runtime.create_pdl_component(component, false); + let new_connector = self.runtime.get_component_private(&new_component_key); + + // First pass: transfer ports and the associated messages, + // also count the number of ports that have peers + let mut num_peers = 0; + for port_id in initial_ports { + // Transfer messages associated with the transferred port + scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); + + // Transfer the port itself + let port_index = scheduled.ctx.ports.iter() + .position(|v| v.self_id == port_id) + .unwrap(); + let port = scheduled.ctx.ports.remove(port_index); + new_connector.ctx.ports.push(port.clone()); + + if port.state == PortState::Open { + num_peers += 1; + } + } + + if num_peers == 0 { + // No peers to notify, so just schedule the component + self.runtime.push_work(new_component_key); + } else { + // Some peers to notify + let new_component_id = new_component_key.downcast(); + let control_id = scheduled.router.prepare_new_component(new_component_key); + for port in new_connector.ctx.ports.iter() { + if port.state == PortState::Closed { + continue; + } + + let control_message = scheduled.router.prepare_changed_port_peer( + control_id, scheduled.ctx.id, + port.peer_connector, port.peer_id, + new_component_id, port.self_id + ); + self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message)); + self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message)); + } + } + }, + ComponentStateChange::CreatedPort(port) => { + scheduled.ctx.ports.push(port); + }, + ComponentStateChange::ChangedPort(port_change) => { + if port_change.is_acquired { + scheduled.ctx.ports.push(port_change.port); + } else { + let index = scheduled.ctx.ports + .iter() + .position(|v| v.self_id == port_change.port.self_id) + .unwrap(); + scheduled.ctx.ports.remove(index); + } + } + } + } + + // Finally, check if we just entered or just left a sync region + if scheduled.ctx.changed_in_sync { + if scheduled.ctx.is_in_sync { + // Just entered sync region + } else { + // Just left sync region. So prepare inbox for the next sync + // round + scheduled.ctx.inbox.clear_read_messages(); + } + + scheduled.ctx.changed_in_sync = false; // reset flag + } + } + + fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { + debug_assert_eq!(connector_key.index, connector.ctx.id.index); + debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); + + // This is the running connector, and only the running connector may + // decide it wants to sleep again. + connector.public.sleeping.store(true, Ordering::Release); + + // But due to reordering we might have received messages from peers who + // did not consider us sleeping. If so, then we wake ourselves again. + if !connector.public.inbox.is_empty() { + // Try to wake ourselves up (needed because someone might be trying + // the exact same atomic compare-and-swap at this point in time) + let should_wake_up_again = connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up_again { + self.runtime.push_work(connector_key) + } + } + } + + fn debug(&self, message: &str) { + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + } + + fn debug_conn(&self, conn: ConnectorId, message: &str) { + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.index, message); + } +} + +// ----------------------------------------------------------------------------- +// ComponentCtx +// ----------------------------------------------------------------------------- + +enum ComponentStateChange { + CreatedComponent(ConnectorPDL, Vec), + CreatedPort(Port), + ChangedPort(ComponentPortChange), +} + +#[derive(Clone)] +pub(crate) struct ComponentPortChange { + pub is_acquired: bool, // otherwise: released + pub port: Port, +} + +/// The component context (better name may be invented). This was created +/// because part of the component's state is managed by the scheduler, and part +/// of it by the component itself. When the component starts a sync block or +/// exits a sync block the partially managed state by both component and +/// scheduler need to be exchanged. +pub(crate) struct ComponentCtx { + // Mostly managed by the scheduler + pub(crate) id: ConnectorId, + ports: Vec, + inbox: Inbox, + // Submitted by the component + is_in_sync: bool, + changed_in_sync: bool, + outbox: VecDeque, + state_changes: VecDeque, + + // Workspaces that may be used by components to (generally) prevent + // allocations. Be a good scout and leave it empty after you've used it. + // TODO: Move to scheduler ctx, this is the wrong place + pub workspace_ports: Vec, + pub workspace_branches: Vec, +} + +impl ComponentCtx { + pub(crate) fn new_empty() -> Self { + return Self{ + id: ConnectorId::new_invalid(), + ports: Vec::new(), + inbox: Inbox::new(), + is_in_sync: false, + changed_in_sync: false, + outbox: VecDeque::new(), + state_changes: VecDeque::new(), + workspace_ports: Vec::new(), + workspace_branches: Vec::new(), + }; + } + + /// Notify the runtime that the component has created a new component. May + /// only be called outside of a sync block. + pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { + debug_assert!(!self.is_in_sync); + self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); + } + + /// Notify the runtime that the component has created a new port. May only + /// be called outside of a sync block (for ports received during a sync + /// block, pass them when calling `notify_sync_end`). + pub(crate) fn push_port(&mut self, port: Port) { + debug_assert!(!self.is_in_sync); + self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) + } + + /// Notify the runtime of an error. Note that this will not perform any + /// special action beyond printing the error. The component is responsible + /// for waiting until it is appropriate to shut down (i.e. being outside + /// of a sync region) and returning the `Exit` scheduling code. + pub(crate) fn push_error(&mut self, error: EvalError) { + println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.index, error); + } + + #[inline] + pub(crate) fn get_ports(&self) -> &[Port] { + return self.ports.as_slice(); + } + + pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { + return self.ports.iter().find(|v| v.self_id == id); + } + + pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { + return self.ports.iter().find(|v| v.channel_id == id); + } + + fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { + return self.ports.iter_mut().find(|v| v.self_id == id); + } + + /// Notify that component will enter a sync block. Note that after calling + /// this function you must allow the scheduler to pick up the changes in the + /// context by exiting your code-executing loop, and to continue executing + /// code the next time the scheduler picks up the component. + pub(crate) fn notify_sync_start(&mut self) { + debug_assert!(!self.is_in_sync); + + self.is_in_sync = true; + self.changed_in_sync = true; + } + + #[inline] + pub(crate) fn is_in_sync(&self) -> bool { + return self.is_in_sync; + } + + /// Submit a message for the scheduler to send to the appropriate receiver. + /// May only be called inside of a sync block. + pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { + debug_assert!(self.is_in_sync); + if let Some(port_id) = contents.source_port() { + let port_info = self.get_port_by_id(port_id); + let is_valid = match port_info { + Some(port_info) => { + port_info.state == PortState::Open + }, + None => false, + }; + if !is_valid { + // We don't own the port + println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index); + return Err(()); + } + } + + self.outbox.push_back(contents); + return Ok(()); + } + + /// Notify that component just finished a sync block. Like + /// `notify_sync_start`: drop out of the `Component::Run` function. + pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { + debug_assert!(self.is_in_sync); + + self.is_in_sync = false; + self.changed_in_sync = true; + + self.state_changes.reserve(changed_ports.len()); + for changed_port in changed_ports { + self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); + } + } + + /// Retrieves messages matching a particular port and branch id. But only + /// those messages that have been previously received with + /// `read_next_message`. + pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { + return self.inbox.get_read_data_messages(match_port_id); + } + + pub(crate) fn get_next_message_ticket(&mut self) -> Option { + if !self.is_in_sync { return None; } + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option { + return self.inbox.get_next_message_ticket(); + } + + #[inline] + pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + return self.inbox.read_message_using_ticket(ticket); + } + + #[inline] + pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + return self.inbox.take_message_using_ticket(ticket) + } + + /// Puts back a message back into the inbox. The reason being that the + /// message is actually part of the next sync round. This will + pub(crate) fn put_back_message(&mut self, message: Message) { + self.inbox.put_back_message(message); + } +} + +pub(crate) struct MessagesIter<'a> { + messages: &'a [Message], + next_index: usize, + max_index: usize, + match_port_id: PortIdLocal, +} + +impl<'a> Iterator for MessagesIter<'a> { + type Item = &'a DataMessage; + + fn next(&mut self) -> Option { + // Loop until match is found or at end of messages + while self.next_index < self.max_index { + let message = &self.messages[self.next_index]; + if let Message::Data(message) = &message { + if message.data_header.target_port == self.match_port_id { + // Found a match + self.next_index += 1; + return Some(message); + } + } else { + // Unreachable because: + // 1. We only iterate over messages that were previously retrieved by `read_next_message`. + // 2. Inbox does not contain control/ping messages. + // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. + unreachable!(); + } + + self.next_index += 1; + } + + // No more messages + return None; + } +} + +// ----------------------------------------------------------------------------- +// Private Inbox +// ----------------------------------------------------------------------------- + +/// A structure that contains inbox messages. Some messages are left inside and +/// continuously re-read. Others are taken out, but may potentially be put back +/// for later reading. Later reading in this case implies that they are put back +/// for reading in the next sync round. +/// TODO: Again, lazy concurrency, see git history for other implementation +struct Inbox { + messages: Vec, + delayed: Vec, + next_read_idx: u32, + generation: u32, +} + +#[derive(Clone, Copy)] +pub(crate) struct MessageTicket { + index: u32, + generation: u32, +} + +impl Inbox { + fn new() -> Self { + return Inbox { + messages: Vec::new(), + delayed: Vec::new(), + next_read_idx: 0, + generation: 0, + } + } + + fn insert_new(&mut self, message: Message) { + assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size + self.messages.push(message); + } + + fn get_next_message_ticket(&mut self) -> Option { + if self.next_read_idx as usize >= self.messages.len() { return None }; + let idx = self.next_read_idx; + self.generation += 1; + self.next_read_idx += 1; + return Some(MessageTicket{ index: idx, generation: self.generation }); + } + + fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { + debug_assert_eq!(self.generation, ticket.generation); + return &self.messages[ticket.index as usize]; + } + + fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { + debug_assert_eq!(self.generation, ticket.generation); + debug_assert!(ticket.index < self.next_read_idx); + self.next_read_idx -= 1; + return self.messages.remove(ticket.index as usize); + } + + fn put_back_message(&mut self, message: Message) { + // We have space in front of the array because we've taken out a message + // before. + self.delayed.push(message); + } + + fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { + return MessagesIter{ + messages: self.messages.as_slice(), + next_index: 0, + max_index: self.next_read_idx as usize, + match_port_id + }; + } + + fn clear_read_messages(&mut self) { + self.messages.drain(0..self.next_read_idx as usize); + for (idx, v) in self.delayed.drain(..).enumerate() { + self.messages.insert(idx, v); + } + self.next_read_idx = 0; + } + + fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { + debug_assert!(self.delayed.is_empty()); + let mut idx = 0; + while idx < self.messages.len() { + let msg = &self.messages[idx]; + if let Some(target) = msg.target_port() { + if target == port { + new_inbox.messages.push(self.messages.remove(idx)); + continue; + } + } + + idx += 1; + } + } +} + +// ----------------------------------------------------------------------------- +// Control messages +// ----------------------------------------------------------------------------- + +struct ControlEntry { + id: u32, + variant: ControlVariant, +} + +enum ControlVariant { + NewComponent(ControlNewComponent), + ChangedPort(ControlChangedPort), + ClosedChannel(ControlClosedChannel), +} + +impl ControlVariant { + fn as_new_component_mut(&mut self) -> &mut ControlNewComponent { + match self { + ControlVariant::NewComponent(v) => v, + _ => unreachable!(), + } + } +} + +/// Entry for a new component waiting for execution after all of its peers have +/// confirmed the `ControlChangedPort` messages. +struct ControlNewComponent { + num_acks_pending: u32, // if it hits 0, we schedule the component + component_key: ConnectorKey, // this is the component we schedule +} + +struct ControlChangedPort { + reroute_if_sent_to_this_port: PortIdLocal, // if sent to this port, then reroute + source_connector: ConnectorId, // connector we expect messages from + target_connector: ConnectorId, // connector we need to reroute to + new_component_entry_id: u32, // if Ack'd, we reduce the counter on this `ControlNewComponent` entry +} + +struct ControlClosedChannel { + source_port: PortIdLocal, + target_port: PortIdLocal, +} + +pub(crate) struct ControlMessageHandler { + id_counter: u32, + active: Vec, +} + +impl ControlMessageHandler { + pub fn new() -> Self { + ControlMessageHandler { + id_counter: 0, + active: Vec::new(), + } + } + + /// Prepares a message indicating that a channel has closed, we keep a local + /// entry to match against the (hopefully) returned `Ack` message. + pub fn prepare_closing_channel( + &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, + self_connector_id: ConnectorId + ) -> ControlMessage { + let id = self.take_id(); + + self.active.push(ControlEntry{ + id, + variant: ControlVariant::ClosedChannel(ControlClosedChannel{ + source_port: self_port_id, + target_port: peer_port_id, + }), + }); + + return ControlMessage { + id, + sending_component_id: self_connector_id, + content: ControlContent::CloseChannel(peer_port_id), + }; + } + + /// Prepares a control entry for a new component. This returns the id of + /// the entry for calls to `prepare_changed_port_peer`. Don't call this + /// function if the component has no peers that need to be messaged. + pub fn prepare_new_component(&mut self, component_key: ConnectorKey) -> u32 { + let id = self.take_id(); + self.active.push(ControlEntry{ + id, + variant: ControlVariant::NewComponent(ControlNewComponent{ + num_acks_pending: 0, + component_key, + }), + }); + + return id; + } + + pub fn prepare_changed_port_peer( + &mut self, new_component_entry_id: u32, creating_component_id: ConnectorId, + changed_component_id: ConnectorId, changed_port_id: PortIdLocal, + new_target_component_id: ConnectorId, new_target_port_id: PortIdLocal + ) -> ControlMessage { + // Add the peer-changed entry + let change_port_entry_id = self.take_id(); + self.active.push(ControlEntry{ + id: change_port_entry_id, + variant: ControlVariant::ChangedPort(ControlChangedPort{ + reroute_if_sent_to_this_port: new_target_port_id, + source_connector: changed_component_id, + target_connector: new_target_component_id, + new_component_entry_id, + }) + }); + + // Increment counter on "new component" entry + let position = self.position(new_component_entry_id).unwrap(); + let new_component_entry = &mut self.active[position]; + let new_component_entry = new_component_entry.variant.as_new_component_mut(); + new_component_entry.num_acks_pending += 1; + + return ControlMessage{ + id: change_port_entry_id, + sending_component_id: creating_component_id, + content: ControlContent::PortPeerChanged(changed_port_id, new_target_component_id), + }; + } + + /// Returns true if the supplied message should be rerouted. If so then this + /// function returns the connector that should retrieve this message. + pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { + for entry in &self.active { + if let ControlVariant::ChangedPort(entry) = &entry.variant { + if entry.reroute_if_sent_to_this_port == target_port { + // Need to reroute this message + return Some(entry.target_connector); + } + } + } + + return None; + } + + /// Handles an Ack as an answer to a previously sent control message. + /// Handling an Ack might spawn a new message that needs to be sent. + pub fn handle_ack(&mut self, id: u32) -> Option { + let index = self.position(id); + + match index { + Some(index) => { + // Remove the entry. If `ChangedPort`, then retrieve associated + // `NewComponent`. Otherwise: early exits + let removed_entry = self.active.remove(index); + let new_component_idx = match removed_entry.variant { + ControlVariant::ChangedPort(message) => { + self.position(message.new_component_entry_id).unwrap() + }, + _ => return None, + }; + + // Decrement counter, if 0, then schedule component + let new_component_entry = self.active[new_component_idx].variant.as_new_component_mut(); + new_component_entry.num_acks_pending -= 1; + if new_component_entry.num_acks_pending != 0 { + return None; + } + + // Return component key for scheduling + let new_component_entry = self.active.remove(new_component_idx); + let new_component_entry = match new_component_entry.variant { + ControlVariant::NewComponent(entry) => entry, + _ => unreachable!(), + }; + + return Some(new_component_entry.component_key); + }, + None => { + todo!("handling of nefarious ACKs"); + return None; + }, + } + } + + /// Retrieves the number of responses we still expect to receive from our + /// peers + #[inline] + pub fn num_pending_acks(&self) -> usize { + return self.active.len(); + } + + fn take_id(&mut self) -> u32 { + let generated_id = self.id_counter; + let (new_id, _) = self.id_counter.overflowing_add(1); + self.id_counter = new_id; + + return generated_id; + } + + #[inline] + fn position(&self, id: u32) -> Option { + return self.active.iter().position(|v| v.id == id); + } +} \ No newline at end of file