use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; use super::inbox::{ Message, DataMessage, ControlMessage, ControlContent, SyncControlMessage, SyncControlContent, }; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32, } impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while let ConnectorScheduling::Immediate = cur_schedule { self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its // current state. if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; } } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled); cur_schedule = new_schedule; } } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.ctx.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, connector_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message)); self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message)); } } // Any messages still in the public inbox should be handled scheduled.ctx.inbox.clear_read_messages(); while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { let message = scheduled.ctx.take_message_using_ticket(ticket); self.handle_message_while_shutting_down(message, scheduled); } if scheduled.router.num_pending_acks() == 0 { // All ports (if any) already closed self.runtime.initiate_component_destruction(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); }, } } } /// Receiving messages from the public inbox and handling them or storing /// them in the component's private inbox fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); if let Some(target_port) = message.target_port() { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); // We insert directly into the private inbox. Since we have // a reroute entry the component can not yet be running. if let Message::Control(_) = &message { self.runtime.send_message_assumed_alive(other_component_id, message); } else { let key = unsafe { ConnectorKey::from_id(other_component_id) }; let component = self.runtime.get_component_private(&key); component.ctx.inbox.insert_new(message); } continue; } match scheduled.ctx.get_port_by_id(target_port) { Some(port_info) => { if port_info.state == PortState::Closed { // We're no longer supposed to receive messages // (rerouted message arrived much later!) continue } }, None => { // Apparently we no longer have a handle to the port continue; } } } // If here, then we should handle the message self.debug_conn(connector_id, " ... Handling the message"); if let Message::Control(message) = &message { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; // Note: for simplicity we program the scheduler to always finish // running a connector with an empty outbox. If this ever changes // then accepting the "port peer changed" message implies we need // to change the recipient of the message in the outbox. debug_assert!(scheduled.ctx.outbox.is_empty()); // And respond with an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message)); self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message)); self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message); }, ControlContent::Ack => { if let Some(component_key) = scheduled.router.handle_ack(message.id) { self.runtime.push_work(component_key); }; }, ControlContent::Ping => {}, } } else { // Not a control message if scheduled.shutting_down { // Since we're shutting down, we just want to respond with a // message saying the message did not arrive. debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared self.handle_message_while_shutting_down(message, scheduled); } else { scheduled.ctx.inbox.insert_new(message); } } } } fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) { let target_port_and_round_number = match message { Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), Message::SyncComp(_) => None, Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), Message::SyncControl(_) => None, Message::Control(_) => None, }; if let Some((target_port, sync_round)) = target_port_and_round_number { // This message is aimed at a port, but we're shutting down, so // notify the peer that its was not received properly. // (also: since we're shutting down, we're not in sync mode and // the context contains the definitive set of owned ports) let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); if port.state == PortState::Open { let message = SyncControlMessage { in_response_to_sync_round: sync_round, target_component_id: port.peer_connector, content: SyncControlContent::ChannelIsClosed(port.peer_id), }; self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message)); self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message)); } } } /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { let (target_component_id, over_port) = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } (port_desc.peer_connector, true) }, Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). (content.target_component_id, false) }, Message::SyncPort(content) => { let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } (port_desc.peer_connector, true) }, Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message)); if over_port { self.runtime.send_message_assumed_alive(target_component_id, message); } else { self.runtime.send_message_maybe_destroyed(target_component_id, message); } } while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { // Creating a new component. Need to relinquish control of // the ports. let new_component_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_component_key); // First pass: transfer ports and the associated messages, // also count the number of ports that have peers let mut num_peers = 0; for port_id in initial_ports { // Transfer messages associated with the transferred port scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); // Transfer the port itself let port_index = scheduled.ctx.ports.iter() .position(|v| v.self_id == port_id) .unwrap(); let port = scheduled.ctx.ports.remove(port_index); new_connector.ctx.ports.push(port.clone()); if port.state == PortState::Open { num_peers += 1; } } if num_peers == 0 { // No peers to notify, so just schedule the component self.runtime.push_work(new_component_key); } else { // Some peers to notify let new_component_id = new_component_key.downcast(); let control_id = scheduled.router.prepare_new_component(new_component_key); for port in new_connector.ctx.ports.iter() { if port.state == PortState::Closed { continue; } let control_message = scheduled.router.prepare_changed_port_peer( control_id, scheduled.ctx.id, port.peer_connector, port.peer_id, new_component_id, port.self_id ); self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message)); self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message)); } } }, ComponentStateChange::CreatedPort(port) => { scheduled.ctx.ports.push(port); }, ComponentStateChange::ChangedPort(port_change) => { if port_change.is_acquired { scheduled.ctx.ports.push(port_change.port); } else { let index = scheduled.ctx.ports .iter() .position(|v| v.self_id == port_change.port.self_id) .unwrap(); scheduled.ctx.ports.remove(index); } } } } // Finally, check if we just entered or just left a sync region if scheduled.ctx.changed_in_sync { if scheduled.ctx.is_in_sync { // Just entered sync region } else { // Just left sync region. So prepare inbox for the next sync // round scheduled.ctx.inbox.clear_read_messages(); } scheduled.ctx.changed_in_sync = false; // reset flag } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { debug_assert_eq!(connector_key.index, connector.ctx.id.index); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); // But due to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { // Try to wake ourselves up (needed because someone might be trying // the exact same atomic compare-and-swap at this point in time) let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up_again { self.runtime.push_work(connector_key) } } } // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.index, message); } } // ----------------------------------------------------------------------------- // ComponentCtx // ----------------------------------------------------------------------------- enum ComponentStateChange { CreatedComponent(ConnectorPDL, Vec), CreatedPort(Port), ChangedPort(ComponentPortChange), } #[derive(Clone)] pub(crate) struct ComponentPortChange { pub is_acquired: bool, // otherwise: released pub port: Port, } /// The component context (better name may be invented). This was created /// because part of the component's state is managed by the scheduler, and part /// of it by the component itself. When the component starts a sync block or /// exits a sync block the partially managed state by both component and /// scheduler need to be exchanged. pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, inbox: Inbox, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, outbox: VecDeque, state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. // TODO: Move to scheduler ctx, this is the wrong place pub workspace_ports: Vec, pub workspace_branches: Vec, } impl ComponentCtx { pub(crate) fn new_empty() -> Self { return Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), inbox: Inbox::new(), is_in_sync: false, changed_in_sync: false, outbox: VecDeque::new(), state_changes: VecDeque::new(), workspace_ports: Vec::new(), workspace_branches: Vec::new(), }; } /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); } /// Notify the runtime that the component has created a new port. May only /// be called outside of a sync block (for ports received during a sync /// block, pass them when calling `notify_sync_end`). pub(crate) fn push_port(&mut self, port: Port) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } /// Notify the runtime of an error. Note that this will not perform any /// special action beyond printing the error. The component is responsible /// for waiting until it is appropriate to shut down (i.e. being outside /// of a sync region) and returning the `Exit` scheduling code. pub(crate) fn push_error(&mut self, error: EvalError) { println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.index, error); } #[inline] pub(crate) fn get_ports(&self) -> &[Port] { return self.ports.as_slice(); } pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { return self.ports.iter().find(|v| v.self_id == id); } pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { return self.ports.iter().find(|v| v.channel_id == id); } fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { return self.ports.iter_mut().find(|v| v.self_id == id); } /// Notify that component will enter a sync block. Note that after calling /// this function you must allow the scheduler to pick up the changes in the /// context by exiting your code-executing loop, and to continue executing /// code the next time the scheduler picks up the component. pub(crate) fn notify_sync_start(&mut self) { debug_assert!(!self.is_in_sync); self.is_in_sync = true; self.changed_in_sync = true; } #[inline] pub(crate) fn is_in_sync(&self) -> bool { return self.is_in_sync; } /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); if let Some(port_id) = contents.source_port() { let port_info = self.get_port_by_id(port_id); let is_valid = match port_info { Some(port_info) => { port_info.state == PortState::Open }, None => false, }; if !is_valid { // We don't own the port println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index); return Err(()); } } self.outbox.push_back(contents); return Ok(()); } /// Notify that component just finished a sync block. Like /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { debug_assert!(self.is_in_sync); self.is_in_sync = false; self.changed_in_sync = true; self.state_changes.reserve(changed_ports.len()); for changed_port in changed_ports { self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); } } /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return self.inbox.get_read_data_messages(match_port_id); } pub(crate) fn get_next_message_ticket(&mut self) -> Option { if !self.is_in_sync { return None; } return self.inbox.get_next_message_ticket(); } #[inline] pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option { return self.inbox.get_next_message_ticket(); } #[inline] pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { return self.inbox.read_message_using_ticket(ticket); } #[inline] pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { return self.inbox.take_message_using_ticket(ticket) } /// Puts back a message back into the inbox. The reason being that the /// message is actually part of the next sync round. This will pub(crate) fn put_back_message(&mut self, message: Message) { self.inbox.put_back_message(message); } } pub(crate) struct MessagesIter<'a> { messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, } impl<'a> Iterator for MessagesIter<'a> { type Item = &'a DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; if let Message::Data(message) = &message { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; return Some(message); } } else { // Unreachable because: // 1. We only iterate over messages that were previously retrieved by `read_next_message`. // 2. Inbox does not contain control/ping messages. // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. unreachable!(); } self.next_index += 1; } // No more messages return None; } } // ----------------------------------------------------------------------------- // Private Inbox // ----------------------------------------------------------------------------- /// A structure that contains inbox messages. Some messages are left inside and /// continuously re-read. Others are taken out, but may potentially be put back /// for later reading. Later reading in this case implies that they are put back /// for reading in the next sync round. /// TODO: Again, lazy concurrency, see git history for other implementation struct Inbox { messages: Vec, delayed: Vec, next_read_idx: u32, generation: u32, } #[derive(Clone, Copy)] pub(crate) struct MessageTicket { index: u32, generation: u32, } impl Inbox { fn new() -> Self { return Inbox { messages: Vec::new(), delayed: Vec::new(), next_read_idx: 0, generation: 0, } } fn insert_new(&mut self, message: Message) { assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size self.messages.push(message); } fn get_next_message_ticket(&mut self) -> Option { if self.next_read_idx as usize >= self.messages.len() { return None }; let idx = self.next_read_idx; self.generation += 1; self.next_read_idx += 1; return Some(MessageTicket{ index: idx, generation: self.generation }); } fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { debug_assert_eq!(self.generation, ticket.generation); return &self.messages[ticket.index as usize]; } fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { debug_assert_eq!(self.generation, ticket.generation); debug_assert!(ticket.index < self.next_read_idx); self.next_read_idx -= 1; return self.messages.remove(ticket.index as usize); } fn put_back_message(&mut self, message: Message) { // We have space in front of the array because we've taken out a message // before. self.delayed.push(message); } fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return MessagesIter{ messages: self.messages.as_slice(), next_index: 0, max_index: self.next_read_idx as usize, match_port_id }; } fn clear_read_messages(&mut self) { self.messages.drain(0..self.next_read_idx as usize); for (idx, v) in self.delayed.drain(..).enumerate() { self.messages.insert(idx, v); } self.next_read_idx = 0; } fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { debug_assert!(self.delayed.is_empty()); let mut idx = 0; while idx < self.messages.len() { let msg = &self.messages[idx]; if let Some(target) = msg.target_port() { if target == port { new_inbox.messages.push(self.messages.remove(idx)); continue; } } idx += 1; } } } // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- struct ControlEntry { id: u32, variant: ControlVariant, } enum ControlVariant { NewComponent(ControlNewComponent), ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), } impl ControlVariant { fn as_new_component_mut(&mut self) -> &mut ControlNewComponent { match self { ControlVariant::NewComponent(v) => v, _ => unreachable!(), } } } /// Entry for a new component waiting for execution after all of its peers have /// confirmed the `ControlChangedPort` messages. struct ControlNewComponent { num_acks_pending: u32, // if it hits 0, we schedule the component component_key: ConnectorKey, // this is the component we schedule } struct ControlChangedPort { reroute_if_sent_to_this_port: PortIdLocal, // if sent to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to new_component_entry_id: u32, // if Ack'd, we reduce the counter on this `ControlNewComponent` entry } struct ControlClosedChannel { source_port: PortIdLocal, target_port: PortIdLocal, } pub(crate) struct ControlMessageHandler { id_counter: u32, active: Vec, } impl ControlMessageHandler { pub fn new() -> Self { ControlMessageHandler { id_counter: 0, active: Vec::new(), } } /// Prepares a message indicating that a channel has closed, we keep a local /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ClosedChannel(ControlClosedChannel{ source_port: self_port_id, target_port: peer_port_id, }), }); return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::CloseChannel(peer_port_id), }; } /// Prepares a control entry for a new component. This returns the id of /// the entry for calls to `prepare_changed_port_peer`. Don't call this /// function if the component has no peers that need to be messaged. pub fn prepare_new_component(&mut self, component_key: ConnectorKey) -> u32 { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::NewComponent(ControlNewComponent{ num_acks_pending: 0, component_key, }), }); return id; } pub fn prepare_changed_port_peer( &mut self, new_component_entry_id: u32, creating_component_id: ConnectorId, changed_component_id: ConnectorId, changed_port_id: PortIdLocal, new_target_component_id: ConnectorId, new_target_port_id: PortIdLocal ) -> ControlMessage { // Add the peer-changed entry let change_port_entry_id = self.take_id(); self.active.push(ControlEntry{ id: change_port_entry_id, variant: ControlVariant::ChangedPort(ControlChangedPort{ reroute_if_sent_to_this_port: new_target_port_id, source_connector: changed_component_id, target_connector: new_target_component_id, new_component_entry_id, }) }); // Increment counter on "new component" entry let position = self.position(new_component_entry_id).unwrap(); let new_component_entry = &mut self.active[position]; let new_component_entry = new_component_entry.variant.as_new_component_mut(); new_component_entry.num_acks_pending += 1; return ControlMessage{ id: change_port_entry_id, sending_component_id: creating_component_id, content: ControlContent::PortPeerChanged(changed_port_id, new_target_component_id), }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.reroute_if_sent_to_this_port == target_port { // Need to reroute this message return Some(entry.target_connector); } } } return None; } /// Handles an Ack as an answer to a previously sent control message. /// Handling an Ack might spawn a new message that needs to be sent. pub fn handle_ack(&mut self, id: u32) -> Option { let index = self.position(id); match index { Some(index) => { // Remove the entry. If `ChangedPort`, then retrieve associated // `NewComponent`. Otherwise: early exits let removed_entry = self.active.remove(index); let new_component_idx = match removed_entry.variant { ControlVariant::ChangedPort(message) => { self.position(message.new_component_entry_id).unwrap() }, _ => return None, }; // Decrement counter, if 0, then schedule component let new_component_entry = self.active[new_component_idx].variant.as_new_component_mut(); new_component_entry.num_acks_pending -= 1; if new_component_entry.num_acks_pending != 0 { return None; } // Return component key for scheduling let new_component_entry = self.active.remove(new_component_idx); let new_component_entry = match new_component_entry.variant { ControlVariant::NewComponent(entry) => entry, _ => unreachable!(), }; return Some(new_component_entry.component_key); }, None => { todo!("handling of nefarious ACKs"); return None; }, } } /// Retrieves the number of responses we still expect to receive from our /// peers #[inline] pub fn num_pending_acks(&self) -> usize { return self.active.len(); } fn take_id(&mut self) -> u32 { let generated_id = self.id_counter; let (new_id, _) = self.id_counter.overflowing_add(1); self.id_counter = new_id; return generated_id; } #[inline] fn position(&self, id: u32) -> Option { return self.active.iter().position(|v| v.id == id); } }