use std::collections::VecDeque; use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::atomic::Ordering; use crate::collections::RawVec; use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; use super::inbox::{ Message, DataMessage, SyncHeader, ControlMessage, ControlContent, SyncControlMessage, SyncControlContent, }; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32, } impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while let ConnectorScheduling::Immediate = cur_schedule { self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its // current state. if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); self.handle_inbox_while_shutting_down(scheduled); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector self.runtime.destroy_component(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; } } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled); cur_schedule = new_schedule; } } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.ctx.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); self.runtime.send_message(port.peer_connector, Message::Control(message)); } } if scheduled.router.num_pending_acks() == 0 { // All ports (if any) already closed self.runtime.destroy_component(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); }, } } } /// Receiving messages from the public inbox and handling them or storing /// them in the component's private inbox fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); if let Some(target_port) = message.target_port() { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); self.runtime.send_message(other_component_id, message); continue; } } // If here, then we should handle the message self.debug_conn(connector_id, " ... Handling the message"); match message { Message::Control(message) => { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; // Note: for simplicity we program the scheduler to always finish // running a connector with an empty outbox. If this ever changes // then accepting the "port peer changed" message implies we need // to change the recipient of the message in the outbox. debug_assert!(scheduled.ctx.outbox.is_empty()); // And respond with an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); self.runtime.send_message(target_component, new_control_message); }; }, ControlContent::Ping => {}, } }, _ => { // All other cases have to be handled by the component scheduled.ctx.inbox.insert_new(message); } } } } /// Handles inbox messages while shutting down. This intends to handle the /// case where a component cleanly exited outside of a sync region, but a /// peer, before receiving the `CloseChannel` message, sent a message inside /// a sync region. This peer should be notified that its message is not /// received by a component in a sync region. fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { // Note: we're not handling the public inbox, we're dealing with the // private one! debug_assert!(scheduled.shutting_down); while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() { let message = scheduled.ctx.read_message_using_ticket(ticket); let target_port_and_round_number = match message { Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), Message::SyncComp(_) => None, Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), Message::SyncControl(_) => None, Message::Control(_) => None, }; if let Some((target_port, sync_round)) = target_port_and_round_number { // This message is aimed at a port, but we're shutting down, so // notify the peer that its was not received properly. // (also: since we're shutting down, we're not in sync mode and // the context contains the definitive set of owned ports) let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); let message = SyncControlMessage{ in_response_to_sync_round: sync_round, target_component_id: port.peer_connector, content: SyncControlContent::ChannelIsClosed(port.peer_id), }; self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); } } } /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message)); let target_component_id = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } port_desc.peer_connector }, Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, Message::SyncPort(content) => { let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } port_desc.peer_connector }, Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; self.runtime.send_message(target_component_id, message); } while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { // Creating a new component. The creator needs to relinquish // ownership of the ports that are given to the new // component. All data messages that were intended for that // port also needs to be transferred. let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); for port_id in initial_ports { // Transfer messages associated with the transferred port scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); // Transfer the port itself let port_index = scheduled.ctx.ports.iter() .position(|v| v.self_id == port_id) .unwrap(); let port = scheduled.ctx.ports.remove(port_index); new_connector.ctx.ports.push(port.clone()); // Notify the peer that the port has changed, but only // if the port wasn't already closed (otherwise the peer // is gone). if port.state == PortState::Open { let reroute_message = scheduled.router.prepare_reroute( port.self_id, port.peer_id, scheduled.ctx.id, port.peer_connector, new_connector.ctx.id, &mut new_connector.router ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); } } // Schedule new connector to run self.runtime.push_work(new_key); }, ComponentStateChange::CreatedPort(port) => { scheduled.ctx.ports.push(port); }, ComponentStateChange::ChangedPort(port_change) => { if port_change.is_acquired { scheduled.ctx.ports.push(port_change.port); } else { let index = scheduled.ctx.ports .iter() .position(|v| v.self_id == port_change.port.self_id) .unwrap(); scheduled.ctx.ports.remove(index); } } } } // Finally, check if we just entered or just left a sync region if scheduled.ctx.changed_in_sync { if scheduled.ctx.is_in_sync { // Just entered sync region } else { // Just left sync region. So prepare inbox for the next sync // round scheduled.ctx.inbox.prepare_for_next_round(); } scheduled.ctx.changed_in_sync = false; // reset flag } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { debug_assert_eq!(connector_key.index, connector.ctx.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); // But due to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { // Try to wake ourselves up (needed because someone might be trying // the exact same atomic compare-and-swap at this point in time) let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up_again { self.runtime.push_work(connector_key) } } } // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } // ----------------------------------------------------------------------------- // ComponentCtx // ----------------------------------------------------------------------------- enum ComponentStateChange { CreatedComponent(ConnectorPDL, Vec), CreatedPort(Port), ChangedPort(ComponentPortChange), } #[derive(Clone)] pub(crate) struct ComponentPortChange { pub is_acquired: bool, // otherwise: released pub port: Port, } /// The component context (better name may be invented). This was created /// because part of the component's state is managed by the scheduler, and part /// of it by the component itself. When the component starts a sync block or /// exits a sync block the partially managed state by both component and /// scheduler need to be exchanged. pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, inbox: Inbox, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, outbox: VecDeque, state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. // TODO: Move to scheduler ctx, this is the wrong place pub workspace_ports: Vec, pub workspace_branches: Vec, } impl ComponentCtx { pub(crate) fn new_empty() -> Self { return Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), inbox: Inbox::new(), is_in_sync: false, changed_in_sync: false, outbox: VecDeque::new(), state_changes: VecDeque::new(), workspace_ports: Vec::new(), workspace_branches: Vec::new(), }; } /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); } /// Notify the runtime that the component has created a new port. May only /// be called outside of a sync block (for ports received during a sync /// block, pass them when calling `notify_sync_end`). pub(crate) fn push_port(&mut self, port: Port) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } /// Notify the runtime of an error. Note that this will not perform any /// special action beyond printing the error. The component is responsible /// for waiting until it is appropriate to shut down (i.e. being outside /// of a sync region) and returning the `Exit` scheduling code. pub(crate) fn push_error(&mut self, error: EvalError) { println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error); } #[inline] pub(crate) fn get_ports(&self) -> &[Port] { return self.ports.as_slice(); } pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { return self.ports.iter().find(|v| v.self_id == id); } pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { return self.ports.iter().find(|v| v.channel_id == id); } fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { return self.ports.iter_mut().find(|v| v.self_id == id); } /// Notify that component will enter a sync block. Note that after calling /// this function you must allow the scheduler to pick up the changes in the /// context by exiting your code-executing loop, and to continue executing /// code the next time the scheduler picks up the component. pub(crate) fn notify_sync_start(&mut self) { debug_assert!(!self.is_in_sync); self.is_in_sync = true; self.changed_in_sync = true; } #[inline] pub(crate) fn is_in_sync(&self) -> bool { return self.is_in_sync; } /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); if let Some(port_id) = contents.source_port() { let port_info = self.get_port_by_id(port_id); let is_valid = match port_info { Some(port_info) => { port_info.state == PortState::Open }, None => false, }; if !is_valid { // We don't own the port println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index); return Err(()); } } self.outbox.push_back(contents); return Ok(()); } /// Notify that component just finished a sync block. Like /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { debug_assert!(self.is_in_sync); self.is_in_sync = false; self.changed_in_sync = true; self.state_changes.reserve(changed_ports.len()); for changed_port in changed_ports { self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); } } /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return self.inbox.get_read_data_messages(match_port_id); } pub(crate) fn get_next_message_ticket(&mut self) -> Option { if !self.is_in_sync { return None; } return self.inbox.get_next_message_ticket(); } #[inline] pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option { return self.inbox.get_next_message_ticket(); } #[inline] pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { return self.inbox.read_message_using_ticket(ticket); } #[inline] pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { return self.inbox.take_message_using_ticket(ticket) } /// Puts back a message back into the inbox. The reason being that the /// message is actually part of the next sync round. This will pub(crate) fn put_back_message(&mut self, message: Message) { self.inbox.put_back_message(message); } } pub(crate) struct MessagesIter<'a> { messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, } impl<'a> Iterator for MessagesIter<'a> { type Item = &'a DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; if let Message::Data(message) = &message { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; return Some(message); } } else { // Unreachable because: // 1. We only iterate over messages that were previously retrieved by `read_next_message`. // 2. Inbox does not contain control/ping messages. // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. unreachable!(); } self.next_index += 1; } // No more messages return None; } } // ----------------------------------------------------------------------------- // Private Inbox // ----------------------------------------------------------------------------- /// A structure that contains inbox messages. Some messages are left inside and /// continuously re-read. Others are taken out, but may potentially be put back /// for later reading. Later reading in this case implies that they are put back /// for reading in the next sync round. struct Inbox { messages: RawVec, next_delay_idx: u32, start_read_idx: u32, next_read_idx: u32, generation: u32, } #[derive(Clone, Copy)] pub(crate) struct MessageTicket { index: u32, generation: u32, } impl Inbox { fn new() -> Self { return Inbox { messages: RawVec::new(), next_delay_idx: 0, start_read_idx: 0, next_read_idx: 0, generation: 0, } } fn insert_new(&mut self, message: Message) { assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size self.messages.push(message); } fn get_next_message_ticket(&mut self) -> Option { let cur_read_idx = self.next_read_idx as usize; if cur_read_idx >= self.messages.len() { return None; } self.generation += 1; self.next_read_idx += 1; return Some(MessageTicket{ index: cur_read_idx as u32, generation: self.generation }); } fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message { debug_assert_eq!(self.generation, ticket.generation); return unsafe{ &*self.messages.get(ticket.index as usize) } } fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message { debug_assert_eq!(self.generation, ticket.generation); unsafe { let take_idx = ticket.index as usize; let val = std::ptr::read(self.messages.get(take_idx)); // Move messages to the right, clearing up space in the // front. let num_move_right = take_idx - self.start_read_idx as usize; self.messages.move_range( self.start_read_idx as usize, self.start_read_idx as usize + 1, num_move_right ); self.start_read_idx += 1; return val; } } fn put_back_message(&mut self, message: Message) { // We have space in front of the array because we've taken out a message // before. debug_assert!(self.next_delay_idx < self.start_read_idx); unsafe { // Write to front of the array std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message); self.next_delay_idx += 1; } } fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return MessagesIter{ messages: self.messages.as_slice(), next_index: self.start_read_idx as usize, max_index: self.next_read_idx as usize, match_port_id }; } fn prepare_for_next_round(&mut self) { // Deallocate everything that was read self.destroy_range(self.start_read_idx, self.next_read_idx); self.generation += 1; // Join up all remaining values with the delayed ones in the front let num_to_move = self.messages.len() - self.next_read_idx as usize; self.messages.move_range( self.next_read_idx as usize, self.next_delay_idx as usize, num_to_move ); // Set all indices (and the RawVec len) to make sense in this new state let new_len = self.next_delay_idx as usize + num_to_move; self.next_delay_idx = 0; self.start_read_idx = 0; self.next_read_idx = 0; self.messages.len = new_len; } fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) { // Convoluted assert to make sure we're in non-sync mode, as that is // when this is called, and that makes our lives easier let mut idx = 0; while idx < self.messages.len() { let message = unsafe{ &*self.messages.get(idx) }; if let Some(target_port) = message.target_port() { if target_port == port { // Transfer port unsafe { let message = std::ptr::read(message as *const _); let remaining = self.messages.len() - idx; if remaining > 1 { self.messages.move_range(idx + 1, idx, remaining - 1); } self.messages.len -= 1; new_inbox.insert_new(message); } } else { // Do not transfer port idx += 1; } } } } #[inline] fn destroy_range(&mut self, start_idx: u32, end_idx: u32) { for idx in (start_idx as usize)..(end_idx as usize) { unsafe { let msg = self.messages.get_mut(idx); std::ptr::drop_in_place(msg); } } } } impl Drop for Inbox { fn drop(&mut self) { // Whether in sync or not in sync. We have two ranges of allocated // messages: // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync) // - readable messages: from `start_read_idx` to `messages.len` self.destroy_range(0, self.next_delay_idx); self.destroy_range(self.start_read_idx, self.messages.len as u32); } } // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- struct ControlEntry { id: u32, variant: ControlVariant, } enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), ReroutePending, } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack } struct ControlClosedChannel { source_port: PortIdLocal, target_port: PortIdLocal, } pub(crate) struct ControlMessageHandler { id_counter: u32, active: Vec, } impl ControlMessageHandler { pub fn new() -> Self { ControlMessageHandler { id_counter: 0, active: Vec::new(), } } /// Prepares a message indicating that a channel has closed, we keep a local /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ClosedChannel(ControlClosedChannel{ source_port: self_port_id, target_port: peer_port_id, }), }); return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::CloseChannel(peer_port_id), }; } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler, ) -> ControlMessage { let id = self.take_id(); let new_owner_id = new_owner_ctrl_handler.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ChangedPort(ControlChangedPort{ target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, id_of_ack_after_confirmation: new_owner_id, }), }); new_owner_ctrl_handler.active.push(ControlEntry{ id: new_owner_id, variant: ControlVariant::ReroutePending, }); return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.target_port == target_port { // Need to reroute this message return Some(entry.target_connector); } } } return None; } /// Handles an Ack as an answer to a previously sent control message. /// Handling an Ack might spawn a new message that needs to be sent. pub fn handle_ack(&mut self, handler_component_id: ConnectorId, id: u32) -> Option<(ConnectorId, Message)> { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { let removed = self.active.remove(index); match removed.variant { ControlVariant::ChangedPort(message) => { return Some(( message.target_connector, Message::Control(ControlMessage{ id: message.id_of_ack_after_confirmation, sending_component_id: handler_component_id, content: ControlContent::Ack }) )); }, _ => return None, } }, None => { todo!("handling of nefarious ACKs"); return None; }, } } /// Retrieves the number of responses we still expect to receive from our /// peers #[inline] pub fn num_pending_acks(&self) -> usize { return self.active.len(); } fn take_id(&mut self) -> u32 { let generated_id = self.id_counter; let (new_id, _) = self.id_counter.overflowing_add(1); self.id_counter = new_id; return generated_id; } }