use std::sync::Arc; use std::sync::Condvar; use std::sync::atomic::Ordering; use std::time::Duration; use std::thread; use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; use super::RuntimeInner; use super::port::{PortIdLocal}; use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; pub(crate) struct Scheduler { runtime: Arc, } impl Scheduler { pub fn new(runtime: Arc) -> Self { return Self{ runtime }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work let connector_key = self.runtime.global_store.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping thread::sleep(Duration::new(1, 0)); if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! break 'thread_loop; } continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { match message { Message::Data(message) => { // Check if we need to reroute, or can just put it // in the private inbox of the connector if let Some(other_connector_id) = scheduled.router.should_reroute(&message) { self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); } else { scheduled.connector.inbox.insert_message(message); } }, Message::Control(message) => { match message.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target let port = self.runtime.global_store.ports.get(&connector_key, port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack self.send_message_and_wake_up_if_sleeping( message.sender, Message::Control(ControlMessage{ id: message.id, sender: connector_key.downcast(), content: ControlMessageVariant::Ack, }) ); }, ControlMessageVariant::Ack => { scheduled.router.handle_ack(message.id); } } }, Message::Ping => {}, } } // Actually run the connector // TODO: Revise let new_schedule; match &mut scheduled.connector { ConnectorVariant::UserDefined(connector) => { if connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state); debug_assert!(delta_state.outbox.is_empty()); } }, ConnectorVariant::Native(connector) => { new_schedule = connector.run(&self.runtime.protocol_description); }, } // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_delta_state(&connector_key, &mut delta_state); cur_schedule = new_schedule; } // If here then the connector does not require immediate execution. // So enqueue it if requested, and otherwise put it in a sleeping // state. match cur_schedule { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later self.runtime.global_store.connector_queue.push_back(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); scheduled.public.sleeping.store(true, Ordering::Release); // We might have received a message in the meantime from a // thread that did not see the sleeping flag set to `true`, // so: if !scheduled.public.inbox.is_empty() { let should_reschedule_self = scheduled.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_reschedule_self { self.runtime.global_store.connector_queue.push_back(connector_key); } } } } } } fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) { // Handling any messages that were sent if !delta_state.outbox.is_empty() { for message in delta_state.outbox.drain(..) { let (inbox_message, target_connector_id) = { let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port); ( DataMessage { sending_connector: connector_key.downcast(), sending_port: sending_port.self_id, receiving_port: sending_port.peer_id, sender_prev_branch_id: message.sender_prev_branch_id, sender_cur_branch_id: message.sender_cur_branch_id, message: message.message, }, sending_port.peer_connector, ) }; self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message)); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector)); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Each port should be lost by the connector that created the // new one. Note that the creator is the current owner. for port_id in &new_connector.ports.owned_ports { debug_assert!(!cur_connector.ports.owned_ports.contains(port_id)); // Modify ownership, retrieve peer connector let (peer_connector_id, peer_port_id) = { let mut port = self.runtime.global_store.ports.get(connector_key, *port_id); port.owning_connector = new_key.downcast(); (port.peer_connector, port.peer_id) }; // Send message that port has changed ownership let reroute_message = cur_connector.router.prepare_reroute( port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast() ); self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); } // Schedule new connector to run self.runtime.global_store.connector_queue.push_back(new_key); } } } pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; self.runtime.global_store.connector_queue.push_back(key); } } } /// Represents a rerouting entry due to a moved port // TODO: Optimize struct ReroutedTraffic { id: u32, // ID of control message port: PortIdLocal, // targeted port source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector they should be rerouted to } pub(crate) struct Router { id_counter: u32, active: Vec, } impl Router { pub fn new() -> Self { Router{ id_counter: 0, active: Vec::new(), } } /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. pub fn prepare_reroute( &mut self, port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> Message { let id = self.id_counter; self.id_counter.overflowing_add(1); self.active.push(ReroutedTraffic{ id, port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }); return Message::Control(ControlMessage{ id, sender: self_connector_id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) }); } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, message: &DataMessage) -> Option { for reroute in &self.active { if reroute.source_connector == message.sending_connector && reroute.port == message.sending_port { // Need to reroute this message return Some(reroute.target_connector); } } return None; } /// Handles an Ack as an answer to a previously sent control message pub fn handle_ack(&mut self, id: u32) { let index = self.active.iter() .position(|v| v.id == id); match index { Some(index) => { self.active.remove(index); }, None => { todo!("handling of nefarious ACKs"); }, } } }