use std::sync::Arc; use std::sync::atomic::Ordering; use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortIdLocal}; use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector pub(crate) struct ConnectorCtx { pub(crate) id: ConnectorId, pub(crate) ports: Vec, } impl ConnectorCtx { pub(crate) fn new() -> ConnectorCtx { Self{ id: ConnectorId::new_invalid(), ports: Vec::new(), } } pub(crate) fn add_port(&mut self, port: Port) { debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); self.ports.push(port); } pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { let index = self.port_id_to_index(id); return self.ports.remove(index); } pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { let index = self.port_id_to_index(id); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { let index = self.port_id_to_index(id); return &mut self.ports[index]; } fn port_id_to_index(&self, id: PortIdLocal) -> usize { for (idx, port) in self.ports.iter().enumerate() { if port.self_id == id { return idx; } } panic!("port {:?}, not owned by connector", id); } } // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32, } impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run let scheduler_id = self.scheduler_id; let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work println!("DEBUG [{}]: Waiting for work", scheduler_id); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit println!("DEBUG [{}]: ... No more work, quitting", scheduler_id); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { self.send_message_and_wake_up_if_sleeping(other_connector_id, message); continue; } // Check for messages that requires special action from the // scheduler. if let MessageContents::Control(content) = message.contents { match content.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack // Note: after this code has been reached, we may not have any // messages in the outbox that send to the port whose owning // connector we just changed. This is because the `ack` will // clear the rerouting entry of the `ack`-receiver. self.send_message_and_wake_up_if_sleeping( message.sending_connector, Message{ sending_connector: connector_key.downcast(), receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id: content.id, content: ControlMessageVariant::Ack, }), } ); }, ControlMessageVariant::Ack => { scheduled.router.handle_ack(content.id); } } } else { // Let connector handle message scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); } } // Actually run the connector println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( scheduler_ctx, &scheduled.context, &mut delta_state ); println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); cur_schedule = new_schedule; } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); scheduled.public.sleeping.store(true, Ordering::Release); // We might have received a message in the meantime from a // thread that did not see the sleeping flag set to `true`, // so: if !scheduled.public.inbox.is_empty() { let should_reschedule_self = scheduled.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_reschedule_self { self.runtime.push_work(connector_key); } } }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.context.ports { self.runtime.send_message(port.peer_connector, Message{ sending_connector: connector_key.downcast(), receiving_port: port.peer_id, contents: MessageContents::Control(ControlMessage{ id: 0, content: ControlMessageVariant::Ack }) }) } } } } } fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { // Handling any messages that were sent let connector_id = connector_key.downcast(); if !delta_state.outbox.is_empty() { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. let (peer_connector, peer_port) = match &mut message { MessageContents::Data(contents) => { let port = context.get_port(contents.sending_port); (port.peer_connector, port.peer_id) }, MessageContents::Sync(contents) => { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid()) }, MessageContents::RequestCommit(contents)=> { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid()) }, MessageContents::ConfirmCommit(contents) => { for to_visit in &contents.to_visit { let message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::ConfirmCommit(contents.clone()), }; self.runtime.send_message(*to_visit, message); } (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::Control(_) | MessageContents::Ping => { // Never generated by the user's code unreachable!(); } }; // TODO: Maybe clean this up, perhaps special case for // ConfirmCommit can be handled differently. if peer_connector.is_valid() { let message = Message { sending_connector: connector_id, receiving_port: peer_port, contents: message, }; self.runtime.send_message(peer_connector, message); } } } if !delta_state.new_ports.is_empty() { for port in delta_state.new_ports.drain(..) { context.ports.push(port); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { let cur_connector = self.runtime.get_component_private(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); let new_connector = self.runtime.get_component_private(&new_key); // Call above changed ownership of ports, but we still have to // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { cur_connector.pending_acks += 1; let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id ); self.runtime.send_message(port.peer_connector, reroute_message); } // Schedule new connector to run self.runtime.push_work(new_key); } } debug_assert!(delta_state.outbox.is_empty()); debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } } // ----------------------------------------------------------------------------- // Control messages // ----------------------------------------------------------------------------- struct ControlEntry { id: u32, variant: ControlVariant, } enum ControlVariant { ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), } struct ControlChangedPort { target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector we need to reroute to } struct ControlClosedChannel { } pub(crate) struct ControlMessageHandler { id_counter: u32, active: Vec, } impl ControlMessageHandler { pub fn new() -> Self { ControlMessageHandler { id_counter: 0, active: Vec::new(), } } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> Message { let id = self.id_counter; let (new_id_counter, _) = self.id_counter.overflowing_add(1); self.id_counter = new_id_counter; self.active.push(ReroutedTraffic{ id, target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }); return Message{ sending_connector: self_connector_id, receiving_port: peer_port_id, contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), }) }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for reroute in &self.active { if reroute.source_connector == sending_connector && reroute.target_port == target_port { // Need to reroute this message return Some(reroute.target_connector); } } return None; } /// Handles an Ack as an answer to a previously sent control message pub fn handle_ack(&mut self, id: u32) { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { self.active.remove(index); }, None => { todo!("handling of nefarious ACKs"); }, } } }