use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::thread; use crate::runtime2::global_store::ConnectorVariant; use crate::runtime2::inbox::MessageContents; use crate::runtime2::native::Connector; use crate::runtime2::port::{Channel, PortKind}; use super::RuntimeInner; use super::port::{Port, PortIdLocal}; use super::inbox::{Message, ControlMessage, ControlMessageVariant}; use super::connector::{ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector pub(crate) struct ConnectorCtx { pub(crate) id: ConnectorId, port_counter: Arc, pub(crate) ports: Vec, } impl ConnectorCtx { pub(crate) fn new(port_counter: Arc) -> ConnectorCtx { Self{ id: ConnectorId::new_invalid(), port_counter, ports: Vec::new(), } } /// Creates a (putter, getter) port pair belonging to the same channel. The /// port will be implicitly owned by the connector. pub(crate) fn create_channel(&mut self) -> Channel { let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); self.ports.push(Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, peer_connector: self.id, }); self.ports.push(Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, peer_connector: self.id, }); return Channel{ getter_id, putter_id }; } pub(crate) fn add_port(&mut self, port: Port) { debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); self.ports.push(port); } pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { let index = self.port_id_to_index(id); return self.ports.remove(index); } pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { let index = self.port_id_to_index(id); return &self.ports[index]; } pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { let index = self.port_id_to_index(id); return &mut self.ports[index]; } fn port_id_to_index(&self, id: PortIdLocal) -> usize { for (idx, port) in self.ports.iter().enumerate() { if port.self_id == id { return idx; } } panic!("port {:?}, not owned by connector", id); } } pub(crate) struct Scheduler { runtime: Arc, } // Thinking aloud: actual ports should be accessible by connector, but managed // by the scheduler (to handle rerouting messages). We could just give a read- // only context, instead of an extra call on the "Connector" trait. impl Scheduler { pub fn new(runtime: Arc) -> Self { return Self{ runtime }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work let connector_key = self.runtime.global_store.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping thread::sleep(Duration::new(1, 0)); if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! break 'thread_loop; } continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { self.send_message_and_wake_up_if_sleeping(other_connector_id, message); continue; } // Check for messages that requires special action from the // scheduler. if let MessageContents::Control(content) = message.contents { match content.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack // Note: after this code has been reached, we may not have any // messages in the outbox that send to the port whose owning // connector we just changed. This is because the `ack` will // clear the rerouting entry of the `ack`-receiver. self.send_message_and_wake_up_if_sleeping( message.sending_connector, Message{ sending_connector: connector_key.downcast(), receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id: content.id, content: ControlMessageVariant::Ack, }), } ); }, ControlMessageVariant::Ack => { scheduled.router.handle_ack(content.id); } } } else { // Let connector handle message scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state); } } // Actually run the connector let new_schedule = scheduled.connector.run( &self.runtime.protocol_description, &scheduled.context, &mut delta_state ); // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); cur_schedule = new_schedule; } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.global_store.connector_queue.push_back(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); scheduled.public.sleeping.store(true, Ordering::Release); // We might have received a message in the meantime from a // thread that did not see the sleeping flag set to `true`, // so: if !scheduled.public.inbox.is_empty() { let should_reschedule_self = scheduled.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_reschedule_self { self.runtime.global_store.connector_queue.push_back(connector_key); } } } } } } fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { // Handling any messages that were sent let connector_id = connector_key.downcast(); if !delta_state.outbox.is_empty() { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. let (peer_connector, peer_port) = match &mut message { MessageContents::Data(contents) => { let port = context.get_port(contents.sending_port); (port.peer_connector, port.peer_id) }, MessageContents::Sync(contents) => { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid()) }, MessageContents::RequestCommit(contents)=> { let connector = contents.to_visit.pop().unwrap(); (connector, PortIdLocal::new_invalid()) }, MessageContents::ConfirmCommit(contents) => { for to_visit in &contents.to_visit { let message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::ConfirmCommit(contents.clone()), }; self.send_message_and_wake_up_if_sleeping(*to_visit, message); } (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::Control(_) | MessageContents::Ping => { // Never generated by the user's code unreachable!(); } }; // TODO: Maybe clean this up, perhaps special case for // ConfirmCommit can be handled differently. if peer_connector.is_valid() { let message = Message { sending_connector: connector_id, receiving_port: peer_port, contents: message, }; self.send_message_and_wake_up_if_sleeping(peer_connector, message); } } } if !delta_state.new_ports.is_empty() { for port in delta_state.new_ports.drain(..) { context.ports.push(port); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Call above changed ownership of ports, but we still have to // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id ); self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message); } // Schedule new connector to run self.runtime.global_store.connector_queue.push_back(new_key); } } } fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; self.runtime.global_store.connector_queue.push_back(key); } } } /// Represents a rerouting entry due to a moved port // TODO: Optimize struct ReroutedTraffic { id: u32, // ID of control message target_port: PortIdLocal, // targeted port source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector they should be rerouted to } pub(crate) struct Router { id_counter: u32, active: Vec, } impl Router { pub fn new() -> Self { Router{ id_counter: 0, active: Vec::new(), } } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> Message { let id = self.id_counter; self.id_counter.overflowing_add(1); self.active.push(ReroutedTraffic{ id, target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }); return Message{ sending_connector: self_connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), }) }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for reroute in &self.active { if reroute.source_connector == sending_connector && reroute.target_port == target_port { // Need to reroute this message return Some(reroute.target_connector); } } return None; } /// Handles an Ack as an answer to a previously sent control message pub fn handle_ack(&mut self, id: u32) { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { self.active.remove(index); }, None => { todo!("handling of nefarious ACKs"); }, } } }