diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index dd5288e2399b338563d4c1130aa3fbe5f9c05ee5..00a1ba5f112aba960e8d99d6dd7015ce6977ed26 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -5,23 +5,21 @@ use std::time::Duration; use std::thread; use crate::ProtocolDescription; +use crate::runtime2::global_store::ConnectorVariant; +use super::RuntimeInner; use super::port::{PortIdLocal}; use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; -use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; pub(crate) struct Scheduler { - global: Arc, - code: Arc, + runtime: Arc, } impl Scheduler { - pub fn new(global: Arc, code: Arc) -> Self { - Self{ - global, - code, - } + pub fn new(runtime: Arc) -> Self { + return Self{ runtime }; } pub fn run(&mut self) { @@ -31,12 +29,12 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - let connector_key = self.global.connector_queue.pop_front(); + let connector_key = self.runtime.global_store.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping thread::sleep(Duration::new(1, 0)); - if self.global.should_exit.load(Ordering::Acquire) { + if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! break 'thread_loop; } @@ -46,7 +44,7 @@ impl Scheduler { // We have something to do let connector_key = connector_key.unwrap(); - let scheduled = self.global.connectors.get_mut(&connector_key); + let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the // connector. @@ -68,7 +66,7 @@ impl Scheduler { match message.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target - let port = self.global.ports.get(&connector_key, port_id); + let port = self.runtime.global_store.ports.get(&connector_key, port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); @@ -86,22 +84,31 @@ impl Scheduler { scheduled.router.handle_ack(message.id); } } - } + }, + Message::Ping => {}, } } // Actually run the connector + // TODO: Revise let new_schedule; - if scheduled.connector.is_in_sync_mode() { - // In synchronous mode, so we can expect messages being sent, - // but we never expect the creation of connectors - new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); - debug_assert!(delta_state.new_connectors.is_empty()); - } else { - // In regular running mode (not in a sync block) we cannot send - // messages but we can create new connectors - new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); - debug_assert!(delta_state.outbox.is_empty()); + match &mut scheduled.connector { + ConnectorVariant::UserDefined(connector) => { + if connector.is_in_sync_mode() { + // In synchronous mode, so we can expect messages being sent, + // but we never expect the creation of connectors + new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state); + debug_assert!(delta_state.new_connectors.is_empty()); + } else { + // In regular running mode (not in a sync block) we cannot send + // messages but we can create new connectors + new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state); + debug_assert!(delta_state.outbox.is_empty()); + } + }, + ConnectorVariant::Native(connector) => { + new_schedule = connector.run(&self.runtime.protocol_description); + }, } // Handle all of the output from the current run: messages to @@ -118,7 +125,7 @@ impl Scheduler { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later - self.global.connector_queue.push_back(connector_key); + self.runtime.global_store.connector_queue.push_back(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are @@ -136,7 +143,7 @@ impl Scheduler { .is_ok(); if should_reschedule_self { - self.global.connector_queue.push_back(connector_key); + self.runtime.global_store.connector_queue.push_back(connector_key); } } } @@ -149,7 +156,7 @@ impl Scheduler { if !delta_state.outbox.is_empty() { for message in delta_state.outbox.drain(..) { let (inbox_message, target_connector_id) = { - let sending_port = self.global.ports.get(&connector_key, message.sending_port); + let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port); ( DataMessage { sending_connector: connector_key.downcast(), @@ -170,12 +177,12 @@ impl Scheduler { // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.global.connectors.get_mut(connector_key); + let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.global.connectors.create(new_connector); - let new_connector = self.global.connectors.get_mut(&new_key); + let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector)); + let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Each port should be lost by the connector that created the // new one. Note that the creator is the current owner. @@ -184,7 +191,7 @@ impl Scheduler { // Modify ownership, retrieve peer connector let (peer_connector_id, peer_port_id) = { - let mut port = self.global.ports.get(connector_key, *port_id); + let mut port = self.runtime.global_store.ports.get(connector_key, *port_id); port.owning_connector = new_key.downcast(); (port.peer_connector, port.peer_id) @@ -199,13 +206,13 @@ impl Scheduler { } // Schedule new connector to run - self.global.connector_queue.push_back(new_key); + self.runtime.global_store.connector_queue.push_back(new_key); } } } pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.global.connectors.get_shared(connector_id); + let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping @@ -214,7 +221,7 @@ impl Scheduler { if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.global.connector_queue.push_back(key); + self.runtime.global_store.connector_queue.push_back(key); } } }