diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 12899dc5c6bb601ef88d144ca083df520971ada3..dd5288e2399b338563d4c1130aa3fbe5f9c05ee5 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,13 +1,17 @@ use std::sync::Arc; +use std::sync::Condvar; +use std::sync::atomic::Ordering; use std::time::Duration; use std::thread; + use crate::ProtocolDescription; -use super::inbox::InboxMessage; -use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; -use super::global_store::GlobalStore; +use super::port::{PortIdLocal}; +use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; +use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; -struct Scheduler { +pub(crate) struct Scheduler { global: Arc, code: Arc, } @@ -23,82 +27,268 @@ impl Scheduler { pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run - // TODO: @Memory, scheme for reducing allocations if excessive. let mut delta_state = RunDeltaState::new(); - loop { - // TODO: Check if we're supposed to exit - + 'thread_loop: loop { // Retrieve a unit of work let connector_key = self.global.connector_queue.pop_front(); if connector_key.is_none() { - // TODO: @Performance, needs condition variable for waking up + // TODO: @Performance, needs condition or something, and most + // def' not sleeping thread::sleep(Duration::new(1, 0)); - continue + if self.global.should_exit.load(Ordering::Acquire) { + // Thread exits! + break 'thread_loop; + } + + continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); - let connector = self.global.connectors.get_mut(&connector_key); + let scheduled = self.global.connectors.get_mut(&connector_key); + // Keep running until we should no longer immediately schedule the + // connector. let mut cur_schedule = ConnectorScheduling::Immediate; - while cur_schedule == ConnectorScheduling::Immediate { - let new_schedule; + // Check all the message that are in the shared inbox + while let Some(message) = scheduled.public.inbox.take_message() { + match message { + Message::Data(message) => { + // Check if we need to reroute, or can just put it + // in the private inbox of the connector + if let Some(other_connector_id) = scheduled.router.should_reroute(&message) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); + } else { + scheduled.connector.inbox.insert_message(message); + } + }, + Message::Control(message) => { + match message.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = self.global.ports.get(&connector_key, port_id); + port.peer_connector = new_target_connector_id; + debug_assert!(delta_state.outbox.is_empty()); - // TODO: Check inbox for new message + // And respond with an Ack + self.send_message_and_wake_up_if_sleeping( + message.sender, + Message::Control(ControlMessage{ + id: message.id, + sender: connector_key.downcast(), + content: ControlMessageVariant::Ack, + }) + ); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(message.id); + } + } + } + } + } - if connector.is_in_sync_mode() { + // Actually run the connector + let new_schedule; + if scheduled.connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors - new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); + new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); - - if !delta_state.outbox.is_empty() { - // There are message to send - for message in delta_state.outbox.drain(..) { - let (inbox_message, target_connector_id) = { - // Note: retrieving a port incurs a read lock - let sending_port = self.global.ports.get(&connector_key, message.sending_port); - ( - InboxMessage { - sending_port: sending_port.self_id, - receiving_port: sending_port.peer_id, - sender_prev_branch_id: message.sender_prev_branch_id, - sender_cur_branch_id: message.sender_cur_branch_id, - message: message.message, - }, - sending_port.peer_connector, - ) - }; - - let target_connector = self.global.connectors.get_shared(target_connector_id); - target_connector.inbox.insert_message(inbox_message); - - // TODO: Check silent state. Queue connector if it was silent - } - } } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors - new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); + new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.outbox.is_empty()); + } - if !delta_state.new_connectors.is_empty() { - // Push all connectors into the global state and queue them - // for execution - for connector in delta_state.new_connectors.drain(..) { - // Create connector, modify all of the ports that - // it now owns, then queue it for execution - let connector_key = self.global.connectors.create(connector); - + // Handle all of the output from the current run: messages to + // send and connectors to instantiate. + self.handle_delta_state(&connector_key, &mut delta_state); + + cur_schedule = new_schedule; + } + + // If here then the connector does not require immediate execution. + // So enqueue it if requested, and otherwise put it in a sleeping + // state. + match cur_schedule { + ConnectorScheduling::Immediate => unreachable!(), + ConnectorScheduling::Later => { + // Simply queue it again later + self.global.connector_queue.push_back(connector_key); + }, + ConnectorScheduling::NotNow => { + // Need to sleep, note that we are the only ones which are + // allows to set the sleeping state to `true`, and since + // we're running it must currently be `false`. + debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); + scheduled.public.sleeping.store(true, Ordering::Release); + + // We might have received a message in the meantime from a + // thread that did not see the sleeping flag set to `true`, + // so: + if !scheduled.public.inbox.is_empty() { + let should_reschedule_self = scheduled.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_reschedule_self { self.global.connector_queue.push_back(connector_key); } } } + } + } + } - cur_schedule = new_schedule; + fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) { + // Handling any messages that were sent + if !delta_state.outbox.is_empty() { + for message in delta_state.outbox.drain(..) { + let (inbox_message, target_connector_id) = { + let sending_port = self.global.ports.get(&connector_key, message.sending_port); + ( + DataMessage { + sending_connector: connector_key.downcast(), + sending_port: sending_port.self_id, + receiving_port: sending_port.peer_id, + sender_prev_branch_id: message.sender_prev_branch_id, + sender_cur_branch_id: message.sender_cur_branch_id, + message: message.message, + }, + sending_port.peer_connector, + ) + }; + + self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message)); + } + } + + // Handling any new connectors that were scheduled + // TODO: Pool outgoing messages to reduce atomic access + if !delta_state.new_connectors.is_empty() { + let cur_connector = self.global.connectors.get_mut(connector_key); + + for new_connector in delta_state.new_connectors.drain(..) { + // Add to global registry to obtain key + let new_key = self.global.connectors.create(new_connector); + let new_connector = self.global.connectors.get_mut(&new_key); + + // Each port should be lost by the connector that created the + // new one. Note that the creator is the current owner. + for port_id in &new_connector.ports.owned_ports { + debug_assert!(!cur_connector.ports.owned_ports.contains(port_id)); + + // Modify ownership, retrieve peer connector + let (peer_connector_id, peer_port_id) = { + let mut port = self.global.ports.get(connector_key, *port_id); + port.owning_connector = new_key.downcast(); + + (port.peer_connector, port.peer_id) + }; + + // Send message that port has changed ownership + let reroute_message = cur_connector.router.prepare_reroute( + port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast() + ); + + self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); + } + + // Schedule new connector to run + self.global.connector_queue.push_back(new_key); } } } + + pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { + let connector = self.global.connectors.get_shared(connector_id); + + connector.inbox.insert_message(message); + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe { ConnectorKey::from_id(connector_id) }; + self.global.connector_queue.push_back(key); + } + } +} + +/// Represents a rerouting entry due to a moved port +// TODO: Optimize +struct ReroutedTraffic { + id: u32, // ID of control message + port: PortIdLocal, // targeted port + source_connector: ConnectorId, // connector we expect messages from + target_connector: ConnectorId, // connector they should be rerouted to +} + +pub(crate) struct Router { + id_counter: u32, + active: Vec, +} + +impl Router { + pub fn new() -> Self { + Router{ + id_counter: 0, + active: Vec::new(), + } + } + + /// Prepares rerouting messages due to changed ownership of a port. The + /// control message returned by this function must be sent to the + /// transferred port's peer connector. + pub fn prepare_reroute( + &mut self, + port_id: PortIdLocal, peer_port_id: PortIdLocal, + self_connector_id: ConnectorId, peer_connector_id: ConnectorId, + new_owner_connector_id: ConnectorId + ) -> Message { + let id = self.id_counter; + self.id_counter.overflowing_add(1); + + self.active.push(ReroutedTraffic{ + id, + port: port_id, + source_connector: peer_connector_id, + target_connector: new_owner_connector_id, + }); + + return Message::Control(ControlMessage{ + id, + sender: self_connector_id, + content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) + }); + } + + /// Returns true if the supplied message should be rerouted. If so then this + /// function returns the connector that should retrieve this message. + pub fn should_reroute(&self, message: &DataMessage) -> Option { + for reroute in &self.active { + if reroute.source_connector == message.sending_connector && + reroute.port == message.sending_port { + // Need to reroute this message + return Some(reroute.target_connector); + } + } + + return None; + } + + /// Handles an Ack as an answer to a previously sent control message + pub fn handle_ack(&mut self, id: u32) { + let index = self.active.iter() + .position(|v| v.id == id); + + match index { + Some(index) => { self.active.remove(index); }, + None => { todo!("handling of nefarious ACKs"); }, + } + } } \ No newline at end of file