use std::sync::Arc; use std::sync::atomic::Ordering; use crate::runtime2::ScheduledConnector; use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector pub(crate) struct ConnectorCtx { pub(crate) id: ConnectorId, pub(crate) ports: Vec, } impl ConnectorCtx { pub(crate) fn new() -> ConnectorCtx { Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), } } pub(crate) fn add_port(&mut self, port: Port) { debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); self.ports.push(port); } pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { let index = self.port_id_to_index(id); return self.ports.remove(index); } pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { let index = self.port_id_to_index(id); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { let index = self.port_id_to_index(id); return &mut self.ports[index]; } fn port_id_to_index(&self, id: PortIdLocal) -> usize { for (idx, port) in self.ports.iter().enumerate() { if port.self_id == id { return idx; } } panic!("port {:?}, not owned by connector", id); } } // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32, } impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run let scheduler_id = self.scheduler_id; let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { self.runtime.send_message(other_connector_id, message); continue; } // Check for messages that requires special action from the // scheduler. if let MessageContents::Control(content) = message.contents { match content.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; // Note: for simplicity we program the scheduler to always finish // running a connector with an empty outbox. If this ever changes // then accepting the "port peer changed" message implies we need // to change the recipient of the message in the outbox. debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack self.runtime.send_message( message.sending_connector, Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id: content.id, content: ControlMessageVariant::Ack, }), } ); }, ControlMessageVariant::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.context.get_port_mut(port_id); port.state = PortState::Closed; // Send an Ack self.runtime.send_message( message.sending_connector, Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id: content.id, content: ControlMessageVariant::Ack, }), } ); }, ControlMessageVariant::Ack => { scheduled.router.handle_ack(content.id); } } } else { // Let connector handle message scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); } } // Run the main behaviour of the connector, depending on its // current state. if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector self.runtime.destroy_component(connector_key); continue 'thread_loop; } else { cur_schedule = ConnectorScheduling::NotNow; } } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( scheduler_ctx, &scheduled.context, &mut delta_state ); self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state); cur_schedule = new_schedule; } } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.context.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, connector_id ); self.runtime.send_message(port.peer_connector, message); } } if scheduled.router.num_pending_acks() == 0 { self.runtime.destroy_component(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); } } } } fn handle_delta_state(&mut self, cur_connector: &mut ScheduledConnector, connector_id: ConnectorId, delta_state: &mut RunDeltaState ) { // Handling any messages that were sent if !delta_state.outbox.is_empty() { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message)); let (peer_connector, self_port, peer_port) = match &mut message { MessageContents::Data(contents) => { let port = cur_connector.context.get_port(contents.sending_port); (port.peer_connector, contents.sending_port, port.peer_id) }, MessageContents::Sync(contents) => { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::RequestCommit(contents)=> { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::ConfirmCommit(contents) => { for to_visit in &contents.to_visit { let message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::ConfirmCommit(contents.clone()), }; self.runtime.send_message(*to_visit, message); } (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::Control(_) | MessageContents::Ping => { // Never generated by the user's code unreachable!(); } }; // TODO: Maybe clean this up, perhaps special case for // ConfirmCommit can be handled differently. if peer_connector.is_valid() { if peer_port.is_valid() { // Sending a message to a port, so the port may not be // closed. let port = cur_connector.context.get_port(self_port); match port.state { PortState::Open => {}, PortState::Closed => { todo!("Handling sending over a closed port"); } } } let message = Message { sending_connector: connector_id, receiving_port: peer_port, contents: message, }; self.runtime.send_message(peer_connector, message); } } } if !delta_state.new_ports.is_empty() { for port in delta_state.new_ports.drain(..) { cur_connector.context.ports.push(port); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); let new_connector = self.runtime.get_component_private(&new_key); // Call above changed ownership of ports, but we still have to // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id ); self.runtime.send_message(port.peer_connector, reroute_message); } // Schedule new connector to run self.runtime.push_work(new_key); } } debug_assert!(delta_state.outbox.is_empty()); debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { debug_assert_eq!(connector_key.index, connector.context.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); // But do to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { // Try to wake ourselves up let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up_again { self.runtime.push_work(connector_key) } } } // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- struct ControlEntry { id: u32, variant: ControlVariant, } enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to } struct ControlClosedChannel { source_port: PortIdLocal, target_port: PortIdLocal, } pub(crate) struct ControlMessageHandler { id_counter: u32, active: Vec, } impl ControlMessageHandler { pub fn new() -> Self { ControlMessageHandler { id_counter: 0, active: Vec::new(), } } /// Prepares a message indicating that a channel has closed, we keep a local /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId ) -> Message { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ClosedChannel(ControlClosedChannel{ source_port: self_port_id, target_port: peer_port_id, }), }); return Message{ sending_connector: self_connector_id, receiving_port: peer_port_id, contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::CloseChannel(peer_port_id), }), }; } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> Message { let id = self.take_id(); self.active.push(ControlEntry{ id, variant: ControlVariant::ChangedPort(ControlChangedPort{ target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }), }); return Message{ sending_connector: self_connector_id, receiving_port: peer_port_id, contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), }) }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.source_connector == sending_connector && entry.target_port == target_port { // Need to reroute this message return Some(entry.target_connector); } } } return None; } /// Handles an Ack as an answer to a previously sent control message pub fn handle_ack(&mut self, id: u32) { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { self.active.remove(index); }, None => { todo!("handling of nefarious ACKs"); }, } } /// Retrieves the number of responses we still expect to receive from our /// peers #[inline] pub fn num_pending_acks(&self) -> usize { return self.active.len(); } fn take_id(&mut self) -> u32 { let generated_id = self.id_counter; let (new_id, _) = self.id_counter.overflowing_add(1); self.id_counter = new_id; return generated_id; } }