diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index fc11f600d1c486a1f99272569a0437e58c25fb67..f645cc0572f9149185f1a0db6403a2dab0253f9a 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,60 +1,27 @@ use std::sync::Arc; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::Duration; -use std::thread; +use std::sync::atomic::Ordering; -use crate::runtime2::global_store::ConnectorVariant; -use crate::runtime2::inbox::MessageContents; -use crate::runtime2::native::Connector; -use crate::runtime2::port::{Channel, PortKind}; - -use super::RuntimeInner; +use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortIdLocal}; -use super::inbox::{Message, ControlMessage, ControlMessageVariant}; +use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; -use super::global_store::{ConnectorKey, ConnectorId}; +use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector pub(crate) struct ConnectorCtx { pub(crate) id: ConnectorId, - port_counter: Arc, pub(crate) ports: Vec, } impl ConnectorCtx { - pub(crate) fn new(port_counter: Arc) -> ConnectorCtx { + pub(crate) fn new() -> ConnectorCtx { Self{ id: ConnectorId::new_invalid(), - port_counter, ports: Vec::new(), } } - /// Creates a (putter, getter) port pair belonging to the same channel. The - /// port will be implicitly owned by the connector. - pub(crate) fn create_channel(&mut self) -> Channel { - let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - self.ports.push(Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - peer_connector: self.id, - }); - - self.ports.push(Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - peer_connector: self.id, - }); - - return Channel{ getter_id, putter_id }; - } - pub(crate) fn add_port(&mut self, port: Port) { debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); self.ports.push(port); @@ -108,26 +75,19 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - let mut connector_key = self.runtime.global_store.connector_queue.pop_front(); - while connector_key.is_none() { - // TODO: @Performance, needs condition or something, and most - // def' not sleeping - println!("DEBUG [{}]: Nothing to do", scheduler_id); - thread::sleep(Duration::new(1, 0)); - if self.runtime.global_store.should_exit.load(Ordering::Acquire) { - // Thread exits! - println!("DEBUG [{}]: ... So I am quitting", scheduler_id); - break 'thread_loop; - } - - println!("DEBUG [{}]: ... But I'm still running", scheduler_id); - continue 'thread_loop; + println!("DEBUG [{}]: Waiting for work", scheduler_id); + let connector_key = self.runtime.wait_for_work(); + if connector_key.is_none() { + // We should exit + println!("DEBUG [{}]: ... No more work, quitting", scheduler_id); + break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); - println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index); - let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); + println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); + + let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. @@ -136,6 +96,7 @@ impl Scheduler { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting + println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { self.send_message_and_wake_up_if_sleeping(other_connector_id, message); continue; @@ -197,7 +158,7 @@ impl Scheduler { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later - self.runtime.global_store.connector_queue.push_back(connector_key); + self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are @@ -215,9 +176,14 @@ impl Scheduler { .is_ok(); if should_reschedule_self { - self.runtime.global_store.connector_queue.push_back(connector_key); + self.runtime.push_work(connector_key); } } + }, + ConnectorScheduling::Exit => { + // TODO: Better way of doing this, when exiting then + // connected components must know their channels are invalid + self.runtime.destroy_component(connector_key); } } } @@ -283,12 +249,12 @@ impl Scheduler { // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); + let cur_connector = self.runtime.get_component_private(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector); - let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); + let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); + let new_connector = self.runtime.get_component_private(&new_key); // Call above changed ownership of ports, but we still have to // let the other end of the channel know that the port has @@ -303,13 +269,17 @@ impl Scheduler { } // Schedule new connector to run - self.runtime.global_store.connector_queue.push_back(new_key); + self.runtime.push_work(new_key); } } + + debug_assert!(delta_state.outbox.is_empty()); + debug_assert!(delta_state.new_ports.is_empty()); + debug_assert!(delta_state.new_connectors.is_empty()); } fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.runtime.global_store.connectors.get_shared(connector_id); + let connector = self.runtime.get_component_public(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping @@ -318,7 +288,7 @@ impl Scheduler { if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.runtime.global_store.connector_queue.push_back(key); + self.runtime.push_work(key); } } }