use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; use crate::protocol::eval::EvalError; use crate::runtime2::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; use super::inbox::{Message, DataMessage, ControlMessage, ControlContent}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32, } impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while let ConnectorScheduling::Immediate = cur_schedule { self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its // current state. if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector self.runtime.destroy_component(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; } } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled); cur_schedule = new_schedule; } } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.ctx.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); self.runtime.send_message(port.peer_connector, Message::Control(message)); } } if scheduled.router.num_pending_acks() == 0 { self.runtime.destroy_component(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Error(eval_error) => { // Display error. Then exit println!("Oh oh!\n{}", eval_error); panic!("Abort!"); } } } } /// Receiving messages from the public inbox and handling them or storing /// them in the component's private inbox fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the // target port to another component. self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); if let Some(target_port) = Self::get_message_target_port(&message) { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); self.runtime.send_message(other_component_id, message); continue; } } // If here, then we should handle the message self.debug_conn(connector_id, " ... Handling the message"); match message { Message::Control(message) => { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; // Note: for simplicity we program the scheduler to always finish // running a connector with an empty outbox. If this ever changes // then accepting the "port peer changed" message implies we need // to change the recipient of the message in the outbox. debug_assert!(scheduled.ctx.outbox.is_empty()); // And respond with an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { scheduled.router.handle_ack(message.id); }, ControlContent::Ping => {}, } }, _ => { // All other cases have to be handled by the component scheduled.ctx.inbox_messages.push(message); } } } } /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx.id; // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message)); let target_component_id = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } port_desc.peer_connector }, Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, Message::SyncPort(content) => { let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.target_port); if port_desc.state == PortState::Closed { todo!("handle sending over a closed port") } port_desc.peer_connector } Message::Control(_) => { unreachable!("component sending control messages directly"); } }; self.runtime.send_message(target_component_id, message); } while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { // Creating a new component. The creator needs to relinquish // ownership of the ports that are given to the new // component. All data messages that were intended for that // port also needs to be transferred. let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; while message_idx < scheduled.ctx.inbox_messages.len() { let message = &scheduled.ctx.inbox_messages[message_idx]; if Self::get_message_target_port(message) == Some(port_id) { // Need to transfer this message let message = scheduled.ctx.inbox_messages.remove(message_idx); new_connector.ctx.inbox_messages.push(message); } else { message_idx += 1; } } // Transfer the port itself let port_index = scheduled.ctx.ports.iter() .position(|v| v.self_id == port_id) .unwrap(); let port = scheduled.ctx.ports.remove(port_index); new_connector.ctx.ports.push(port.clone()); // Notify the peer that the port has changed let reroute_message = scheduled.router.prepare_reroute( port.self_id, port.peer_id, scheduled.ctx.id, port.peer_connector, new_connector.ctx.id ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); } // Schedule new connector to run self.runtime.push_work(new_key); }, ComponentStateChange::CreatedPort(port) => { scheduled.ctx.ports.push(port); }, ComponentStateChange::ChangedPort(port_change) => { if port_change.is_acquired { scheduled.ctx.ports.push(port_change.port); } else { let index = scheduled.ctx.ports .iter() .position(|v| v.self_id == port_change.port.self_id) .unwrap(); scheduled.ctx.ports.remove(index); } } } } // Finally, check if we just entered or just left a sync region if scheduled.ctx.changed_in_sync { if scheduled.ctx.is_in_sync { // Just entered sync region } else { // Just left sync region. So clear inbox up until the last // message that was read. scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read); scheduled.ctx.inbox_len_read = 0; } scheduled.ctx.changed_in_sync = false; // reset flag } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { debug_assert_eq!(connector_key.index, connector.ctx.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); // But due to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { // Try to wake ourselves up (needed because someone might be trying // the exact same atomic compare-and-swap at this point in time) let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up_again { self.runtime.push_work(connector_key) } } } #[inline] fn get_message_target_port(message: &Message) -> Option { match message { Message::Data(data) => return Some(data.data_header.target_port), Message::SyncComp(_) => {}, Message::SyncPort(content) => return Some(content.target_port), Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), ControlContent::CloseChannel(port_id) => return Some(*port_id), ControlContent::Ping | ControlContent::Ack => {}, } }, } return None } // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } // ----------------------------------------------------------------------------- // ComponentCtx // ----------------------------------------------------------------------------- enum ComponentStateChange { CreatedComponent(ConnectorPDL, Vec), CreatedPort(Port), ChangedPort(ComponentPortChange), } #[derive(Clone)] pub(crate) struct ComponentPortChange { pub is_acquired: bool, // otherwise: released pub port: Port, } /// The component context (better name may be invented). This was created /// because part of the component's state is managed by the scheduler, and part /// of it by the component itself. When the component starts a sync block or /// exits a sync block the partially managed state by both component and /// scheduler need to be exchanged. pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, inbox_messages: Vec, inbox_len_read: usize, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, outbox: VecDeque, state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. // TODO: Move to scheduler ctx, this is the wrong place pub workspace_ports: Vec, pub workspace_branches: Vec, } impl ComponentCtx { pub(crate) fn new_empty() -> Self { return Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), inbox_messages: Vec::new(), inbox_len_read: 0, is_in_sync: false, changed_in_sync: false, outbox: VecDeque::new(), state_changes: VecDeque::new(), workspace_ports: Vec::new(), workspace_branches: Vec::new(), }; } /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); } /// Notify the runtime that the component has created a new port. May only /// be called outside of a sync block (for ports received during a sync /// block, pass them when calling `notify_sync_end`). pub(crate) fn push_port(&mut self, port: Port) { debug_assert!(!self.is_in_sync); self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } /// Notify the runtime of an error. Note that this will not perform any /// special action beyond printing the error. The component is responsible /// for waiting until it is appropriate to shut down (i.e. being outside /// of a sync region) and returning the `Exit` scheduling code. pub(crate) fn push_error(&mut self, error: EvalError) { } #[inline] pub(crate) fn get_ports(&self) -> &[Port] { return self.ports.as_slice(); } pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { return self.ports.iter().find(|v| v.self_id == id); } pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { return self.ports.iter().find(|v| v.channel_id == id); } fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { return self.ports.iter_mut().find(|v| v.self_id == id); } /// Notify that component will enter a sync block. Note that after calling /// this function you must allow the scheduler to pick up the changes in the /// context by exiting your code-executing loop, and to continue executing /// code the next time the scheduler picks up the component. pub(crate) fn notify_sync_start(&mut self) { debug_assert!(!self.is_in_sync); self.is_in_sync = true; self.changed_in_sync = true; } #[inline] pub(crate) fn is_in_sync(&self) -> bool { return self.is_in_sync; } /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); if let Some(port_id) = contents.source_port() { if self.get_port_by_id(port_id).is_none() { // We don't own the port return Err(()); } } self.outbox.push_back(contents); return Ok(()); } /// Notify that component just finished a sync block. Like /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { debug_assert!(self.is_in_sync); self.is_in_sync = false; self.changed_in_sync = true; self.state_changes.reserve(changed_ports.len()); for changed_port in changed_ports { self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); } } /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return MessagesIter { messages: &self.inbox_messages, next_index: 0, max_index: self.inbox_len_read, match_port_id }; } /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. // TODO: Fix the clone of the data message, entirely unnecessary pub(crate) fn read_next_message(&mut self) -> Option { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } // We want to keep data messages in the inbox, because we need to check // them in the future. We don't want to keep sync messages around, we // should only handle them once. Control messages should never be in // here. let message = &self.inbox_messages[self.inbox_len_read]; match message { Message::Data(content) => { self.inbox_len_read += 1; return Some(Message::Data(content.clone())); }, Message::SyncComp(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, Message::SyncPort(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); } Message::Control(_) => unreachable!("control message ended up in component inbox"), } } } pub(crate) struct MessagesIter<'a> { messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, } impl<'a> Iterator for MessagesIter<'a> { type Item = &'a DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; if let Message::Data(message) = &message { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; return Some(message); } } else { // Unreachable because: // 1. We only iterate over messages that were previously retrieved by `read_next_message`. // 2. Inbox does not contain control/ping messages. // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. unreachable!(); } self.next_index += 1; } // No more messages return None; } } // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- struct ControlEntry { id: u32, variant: ControlVariant, } enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to } struct ControlClosedChannel { source_port: PortIdLocal, target_port: PortIdLocal, } pub(crate) struct ControlMessageHandler { id_counter: u32, active: Vec, } impl ControlMessageHandler { pub fn new() -> Self { ControlMessageHandler { id_counter: 0, active: Vec::new(), } } /// Prepares a message indicating that a channel has closed, we keep a local /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ClosedChannel(ControlClosedChannel{ source_port: self_port_id, target_port: peer_port_id, }), }); return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::CloseChannel(peer_port_id), }; } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ChangedPort(ControlChangedPort{ target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }), }); return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.target_port == target_port { // Need to reroute this message return Some(entry.target_connector); } } } return None; } /// Handles an Ack as an answer to a previously sent control message pub fn handle_ack(&mut self, id: u32) { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { self.active.remove(index); }, None => { todo!("handling of nefarious ACKs"); }, } } /// Retrieves the number of responses we still expect to receive from our /// peers #[inline] pub fn num_pending_acks(&self) -> usize { return self.active.len(); } fn take_id(&mut self) -> u32 { let generated_id = self.id_counter; let (new_id, _) = self.id_counter.overflowing_add(1); self.id_counter = new_id; return generated_id; } }