From 1755ca411ca7174a92f893f796ce7322143e9fc0 2021-10-21 01:50:31 From: MH Date: 2021-10-21 01:50:31 Subject: [PATCH] better exit condition, WIP on bugfixing --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 47ddc84473aeb08641da02f3cb67ff2b56f388e2..5769144ca08a4f4f213f7b21801117d11f3cf8da 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,21 +4,22 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{Message, MessageContents, SolutionMessage}; -use crate::runtime2::native::Connector; -use crate::runtime2::port::{Port, PortKind}; -use crate::runtime2::scheduler::ConnectorCtx; -use super::global_store::ConnectorId; + +use super::ConnectorId; +use super::native::Connector; +use super::scheduler::ConnectorCtx; use super::inbox::{ - PrivateInbox, PublicInbox, DataMessage, SyncMessage, + PrivateInbox, PublicInbox, + DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, SyncBranchConstraint, SyncConnectorSolution }; -use super::port::PortIdLocal; +use super::port::{Port, PortKind, PortIdLocal}; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not /// yet receive anything from any branch). -#[derive(Clone, Copy, PartialEq, Eq)] +// TODO: Remove Debug derive +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BranchId { pub index: u32, } @@ -909,7 +910,7 @@ impl ConnectorPDL { // Need to wait until all children are terminated // TODO: Think about how to do this? branch.sync_state = SpeculativeState::Finished; - return ConnectorScheduling::NotNow; + return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution and reschedule immediately @@ -1312,6 +1313,7 @@ pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running + Exit, // Connector has exited } /// Recursively goes through the value group, attempting to find ports. diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index b14bbafdc12a4c6d71d73634fd83bd1bcecad9f9..b78e79ec1fbdf67be2196a15debcdffcf34f6ce9 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -15,14 +15,15 @@ within a certain sync-round. use std::collections::VecDeque; use std::sync::Mutex; +use super::ConnectorId; use crate::protocol::eval::ValueGroup; use super::connector::BranchId; use super::port::PortIdLocal; -use super::global_store::ConnectorId; /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. -#[derive(Clone)] +// TODO: Remove Debug on messages +#[derive(Debug, Clone)] pub struct DataMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, @@ -30,14 +31,14 @@ pub struct DataMessage { pub message: ValueGroup, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum SyncBranchConstraint { SilentPort(PortIdLocal), BranchNumber(BranchId), PortMapping(PortIdLocal, BranchId), } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SyncConnectorSolution { pub connector_id: ConnectorId, pub terminating_branch_id: BranchId, @@ -45,13 +46,13 @@ pub struct SyncConnectorSolution { pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SyncConnectorConstraints { pub connector_id: ConnectorId, pub constraints: Vec, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SyncMessage { pub local_solutions: Vec, pub constraints: Vec, @@ -167,7 +168,7 @@ impl SyncMessage { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SolutionMessage { pub comparison_number: u64, pub connector_origin: ConnectorId, @@ -177,20 +178,20 @@ pub struct SolutionMessage { /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub content: ControlMessageVariant, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port Ack, // acknowledgement of previous control message, matching occurs through control message ID. } /// Generic message contents. -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum MessageContents { Data(DataMessage), // data message, handled by connector Sync(SyncMessage), // sync message, handled by both connector/scheduler @@ -200,6 +201,7 @@ pub enum MessageContents { Ping, // ping message, intentionally waking up a connector (used for native connectors) } +#[derive(Debug)] pub struct Message { pub sending_connector: ConnectorId, pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index b1acb13a1fd227c022ce0788916b0ed1b7663c23..5238d2ca54a24635e0ad21cfc077131084f531f1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -5,7 +5,6 @@ mod messages; mod connector; mod native; mod port; -mod global_store; mod scheduler; mod inbox; @@ -13,16 +12,18 @@ mod inbox; // Imports -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::VecDeque; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; +use crate::collections::RawVec; use crate::ProtocolDescription; -use global_store::{ConnectorVariant, GlobalStore}; -use scheduler::Scheduler; -use native::{ConnectorApplication, ApplicationInterface}; - +use inbox::Message; +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use scheduler::{Scheduler, ConnectorCtx, Router}; +use native::{Connector, ConnectorApplication, ApplicationInterface}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -97,55 +98,29 @@ pub(crate) struct ScheduledConnector { pub router: Router, } +// ----------------------------------------------------------------------------- +// Runtime +// ----------------------------------------------------------------------------- + /// Externally facing runtime. pub struct Runtime { inner: Arc, } -pub(crate) struct RuntimeInner { - // Protocol - pub(crate) protocol_description: ProtocolDescription, - // Storage of connectors in a kind of freelist. Note the vector of points to - // ensure pointer stability: the vector can be changed but the entries - // themselves remain valid. - pub connectors_list: RawVec<*mut ScheduledConnector>, - pub connectors_free: Vec, - - pub(crate) global_store: GlobalStore, - schedulers: Mutex>>, - active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels -} - -impl RuntimeInner { - #[inline] - pub(crate) fn increment_active_interfaces(&self) { - let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); - debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero - } - - pub(crate) fn decrement_active_interfaces(&self) { - let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { - // Became 0 - // TODO: Check num connectors, if 0, then set exit flag - } - } -} - -// TODO: Come back to this at some point -unsafe impl Send for RuntimeInner {} -unsafe impl Sync for RuntimeInner {} - impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ - global_store: GlobalStore::new(), protocol_description, + port_counter: AtomicU32::new(0), + connectors: RwLock::new(ConnectorStore::with_capacity(32)), + connector_queue: Mutex::new(VecDeque::with_capacity(32)), schedulers: Mutex::new(Vec::new()), - active_interfaces: AtomicU32::new(1), // we are the active interface + scheduler_notifier: Condvar::new(), + active_connectors: AtomicU32::new(0), + active_interfaces: AtomicU32::new(1), // this `Runtime` instance + should_exit: AtomicBool::new(false), }); // Launch threads @@ -175,8 +150,9 @@ impl Runtime { /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { + self.inner.increment_active_interfaces(); let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); - let connector_key = self.inner.global_store.connectors.create_interface(connector); + let connector_key = self.inner.create_interface_component(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case @@ -187,10 +163,283 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { - self.inner.global_store.should_exit.store(true, Ordering::Release); - let mut schedulers = self.inner.schedulers.lock().unwrap(); - for scheduler in schedulers.drain(..) { - scheduler.join().unwrap(); + self.inner.decrement_active_interfaces(); + let mut lock = self.inner.schedulers.lock().unwrap(); + for handle in lock.drain(..) { + handle.join().unwrap(); + } + } +} + +// ----------------------------------------------------------------------------- +// RuntimeInner +// ----------------------------------------------------------------------------- + +pub(crate) struct RuntimeInner { + // Protocol + pub(crate) protocol_description: ProtocolDescription, + // Regular counter for port IDs + port_counter: AtomicU32, + // Storage of connectors and the work queue + connectors: RwLock, + connector_queue: Mutex>, + schedulers: Mutex>>, + // Conditions to determine whether the runtime can exit + scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. + // TODO: Figure out if we can simply merge the counters? + active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels + should_exit: AtomicBool, +} + +impl RuntimeInner { + // --- Managing the components queued for execution + + /// Wait until there is a connector to run. If there is one, then `Some` + /// will be returned. If there is no more work, then `None` will be + /// returned. + pub(crate) fn wait_for_work(&self) -> Option { + let mut lock = self.connector_queue.lock().unwrap(); + while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { + lock = self.scheduler_notifier.wait(lock).unwrap(); + } + + return lock.pop_front(); + } + + pub(crate) fn push_work(&self, key: ConnectorKey) { + let mut lock = self.connector_queue.lock().unwrap(); + lock.push_back(key); + self.scheduler_notifier.notify_one(); + } + + // --- Creating/retrieving/destroying components + + pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { + // Initialize as sleeping, as it will be scheduled by the programmer. + let mut lock = self.connectors.write().unwrap(); + let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); + + self.increment_active_components(); + return key; + } + + /// Creates a new PDL component. The caller MUST make sure to schedule the + /// connector. + // TODO: Nicer code, not forcing the caller to schedule, perhaps? + pub(crate) fn create_pdl_component(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { + // Create as not sleeping, as we'll schedule it immediately + let key = { + let mut lock = self.connectors.write().unwrap(); + lock.create(ConnectorVariant::UserDefined(connector), true) + }; + + // Transfer the ports + { + let lock = self.connectors.read().unwrap(); + let created = lock.get_private(&key); + + match &created.connector { + ConnectorVariant::UserDefined(connector) => { + for port_id in connector.ports.owned_ports.iter().copied() { + println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index); + let mut port = created_by.context.remove_port(port_id); + created.context.add_port(port); + } + }, + ConnectorVariant::Native(_) => unreachable!(), + } + } + + self.increment_active_components(); + return key; + } + + pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { + let lock = self.connectors.read().unwrap(); + return lock.get_private(connector_key); + } + + pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { + let lock = self.connectors.read().unwrap(); + return lock.get_public(connector_id); + } + + pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) { + let mut lock = self.connectors.write().unwrap(); + lock.destroy(connector_key); + self.decrement_active_components(); + } + + // --- Managing exit condition + + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that active interfaces is now 0 + let num_connectors = self.active_connectors.load(Ordering::Acquire); + if num_connectors == 0 { + self.signal_for_shutdown(); + } + } + } + + #[inline] + fn increment_active_components(&self) { + self.active_connectors.fetch_add(1, Ordering::SeqCst); + } + + fn decrement_active_components(&self) { + let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 0 { // such that we have no more active connectors (for now!) + let num_interfaces = self.active_interfaces.load(Ordering::Acquire); + if num_interfaces == 0 { + self.signal_for_shutdown(); + } + } + } + + #[inline] + fn signal_for_shutdown(&self) { + debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); + debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); + + println!("DEBUG: Signaling for shutdown"); + let _lock = self.connector_queue.lock().unwrap(); + let should_signal = self.should_exit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_signal { + println!("DEBUG: Notifying all waiting schedulers"); + self.scheduler_notifier.notify_all(); + } + } +} + +// TODO: Come back to this at some point +unsafe impl Send for RuntimeInner {} +unsafe impl Sync for RuntimeInner {} + +// ----------------------------------------------------------------------------- +// ConnectorStore +// ----------------------------------------------------------------------------- + +struct ConnectorStore { + // Freelist storage of connectors. Storage should be pointer-stable as + // someone might be mutating the vector while we're executing one of the + // connectors. + connectors: RawVec<*mut ScheduledConnector>, + free: Vec, +} + +impl ConnectorStore { + fn with_capacity(capacity: usize) -> Self { + Self { + connectors: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + } + } + + /// Retrieves public part of connector - accessible by many threads at once. + fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { + unsafe { + let connector = self.connectors.get(id.0 as usize); + debug_assert!(!connector.is_null()); + return &(**connector).public; + } + } + + /// Retrieves private part of connector - accessible by one thread at a + /// time. + fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { + unsafe { + let connector = self.connectors.get_mut(key.index as usize); + debug_assert!(!connector.is_null()); + return &mut (**connector); + } + } + + /// Creates a new connector. Caller should ensure ports are set up correctly + /// and the connector is queued for execution if needed. + fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { + let mut connector = ScheduledConnector { + connector, + context: ConnectorCtx::new(), + public: ConnectorPublic::new(initially_sleeping), + router: Router::new(), + }; + + let index; + let key; + + if self.free.is_empty() { + // No free entries, allocate new entry + index = self.connectors.len(); + key = ConnectorKey{ index: index as u32 }; + connector.context.id = key.downcast(); + + let connector = Box::into_raw(Box::new(connector)); + self.connectors.push(connector); + } else { + // Free spot available + index = self.free.pop().unwrap(); + key = ConnectorKey{ index: index as u32 }; + connector.context.id = key.downcast(); + + unsafe { + let target = self.connectors.get_mut(index); + std::ptr::write(*target, connector); + } + } + + return key; + } + + /// Destroys a connector. Caller should make sure it is not scheduled for + /// execution. Otherwise one experiences "bad stuff" (tm). + fn destroy(&mut self, key: ConnectorKey) { + unsafe { + let target = self.connectors.get_mut(key.index as usize); + std::ptr::drop_in_place(*target); + // Note: but not deallocating! + } + + self.free.push(key.index as usize); + } +} + +impl Drop for ConnectorStore { + fn drop(&mut self) { + // Everything in the freelist already had its destructor called, so only + // has to be deallocated + for free_idx in self.free.iter().copied() { + unsafe { + let memory = self.connectors.get_mut(free_idx); + let layout = std::alloc::Layout::for_value(&**memory); + std::alloc::dealloc(*memory as *mut u8, layout); + + // mark as null for the remainder + *memory = std::ptr::null_mut(); + } + } + + // With the deallocated stuff marked as null, clear the remainder that + // is not null + for idx in 0..self.connectors.len() { + unsafe { + let memory = *self.connectors.get_mut(idx); + if !memory.is_null() { + let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh + } + } } } } \ No newline at end of file diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 39885d6565633073652f17f267168f76323ea3a6..0e877c36449aab3519e0a4aba53bf24894f62b0f 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -5,17 +5,12 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; -use crate::runtime2::connector::{Branch, find_ports_in_value_group}; -use crate::runtime2::global_store::ConnectorKey; -use crate::runtime2::inbox::MessageContents; -use crate::runtime2::port::{Port, PortKind}; -use crate::runtime2::scheduler::ConnectorCtx; - -use super::RuntimeInner; -use super::global_store::ConnectorId; -use super::port::{Channel, PortIdLocal}; -use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; -use super::inbox::Message; + +use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; +use super::port::{Port, PortIdLocal, Channel, PortKind}; +use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; +use super::connector::find_ports_in_value_group; +use super::inbox::{Message, MessageContents}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -34,6 +29,7 @@ type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL), + Shutdown, } /// The connector which an application can directly interface with. Once may set @@ -69,7 +65,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _protocol_description: &ProtocolDescription, _ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { @@ -82,6 +78,10 @@ impl Connector for ConnectorApplication { ApplicationJob::NewConnector(connector) => { println!("DEBUG: API creating connector"); delta_state.new_connectors.push(connector); + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; } } } @@ -102,8 +102,6 @@ pub struct ApplicationInterface { impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { - runtime.active_interfaces += 1; - return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -114,7 +112,7 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { // TODO: Duplicated logic in scheduler - let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst); + let getter_id = self.runtime.port_counter.fetch_add(2, Ordering::SeqCst); let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); @@ -179,25 +177,7 @@ impl ApplicationInterface { queue.push_back(ApplicationJob::NewConnector(connector)); } - // Send ping message to wake up connector - let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); - connector.inbox.insert_message(Message{ - sending_connector: ConnectorId::new_invalid(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Ping, - }); - - let should_wake_up = connector.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - println!("DEBUG: Waking up connector"); - let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; - self.runtime.global_store.connector_queue.push_back(key); - } else { - println!("DEBUG: NOT waking up connector"); - } + self.wake_up_connector_with_ping(); return Ok(()); } @@ -220,10 +200,37 @@ impl ApplicationInterface { pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } + + fn wake_up_connector_with_ping(&self) { + let connector = self.runtime.get_component_public(self.connector_id); + connector.inbox.insert_message(Message{ + sending_connector: ConnectorId::new_invalid(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Ping, + }); + + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + println!("DEBUG: Waking up connector"); + let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; + self.runtime.push_work(key); + } else { + println!("DEBUG: NOT waking up connector"); + } + } } impl Drop for ApplicationInterface { fn drop(&mut self) { + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::Shutdown); + } + self.wake_up_connector_with_ping(); + self.runtime.decrement_active_interfaces(); } } \ No newline at end of file diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index e24e332c4c899afcd015e81898edb4e0ab224143..ec1472dea11c434f063d786ca556f96293e9307d 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -1,4 +1,4 @@ -use super::global_store::ConnectorId; +use super::ConnectorId; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PortIdLocal { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index fc11f600d1c486a1f99272569a0437e58c25fb67..f645cc0572f9149185f1a0db6403a2dab0253f9a 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,60 +1,27 @@ use std::sync::Arc; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::Duration; -use std::thread; +use std::sync::atomic::Ordering; -use crate::runtime2::global_store::ConnectorVariant; -use crate::runtime2::inbox::MessageContents; -use crate::runtime2::native::Connector; -use crate::runtime2::port::{Channel, PortKind}; - -use super::RuntimeInner; +use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortIdLocal}; -use super::inbox::{Message, ControlMessage, ControlMessageVariant}; +use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; -use super::global_store::{ConnectorKey, ConnectorId}; +use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector pub(crate) struct ConnectorCtx { pub(crate) id: ConnectorId, - port_counter: Arc, pub(crate) ports: Vec, } impl ConnectorCtx { - pub(crate) fn new(port_counter: Arc) -> ConnectorCtx { + pub(crate) fn new() -> ConnectorCtx { Self{ id: ConnectorId::new_invalid(), - port_counter, ports: Vec::new(), } } - /// Creates a (putter, getter) port pair belonging to the same channel. The - /// port will be implicitly owned by the connector. - pub(crate) fn create_channel(&mut self) -> Channel { - let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - self.ports.push(Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - peer_connector: self.id, - }); - - self.ports.push(Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - peer_connector: self.id, - }); - - return Channel{ getter_id, putter_id }; - } - pub(crate) fn add_port(&mut self, port: Port) { debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); self.ports.push(port); @@ -108,26 +75,19 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - let mut connector_key = self.runtime.global_store.connector_queue.pop_front(); - while connector_key.is_none() { - // TODO: @Performance, needs condition or something, and most - // def' not sleeping - println!("DEBUG [{}]: Nothing to do", scheduler_id); - thread::sleep(Duration::new(1, 0)); - if self.runtime.global_store.should_exit.load(Ordering::Acquire) { - // Thread exits! - println!("DEBUG [{}]: ... So I am quitting", scheduler_id); - break 'thread_loop; - } - - println!("DEBUG [{}]: ... But I'm still running", scheduler_id); - continue 'thread_loop; + println!("DEBUG [{}]: Waiting for work", scheduler_id); + let connector_key = self.runtime.wait_for_work(); + if connector_key.is_none() { + // We should exit + println!("DEBUG [{}]: ... No more work, quitting", scheduler_id); + break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); - println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index); - let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); + println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); + + let scheduled = self.runtime.get_component_private(&connector_key); // Keep running until we should no longer immediately schedule the // connector. @@ -136,6 +96,7 @@ impl Scheduler { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting + println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { self.send_message_and_wake_up_if_sleeping(other_connector_id, message); continue; @@ -197,7 +158,7 @@ impl Scheduler { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later - self.runtime.global_store.connector_queue.push_back(connector_key); + self.runtime.push_work(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are @@ -215,9 +176,14 @@ impl Scheduler { .is_ok(); if should_reschedule_self { - self.runtime.global_store.connector_queue.push_back(connector_key); + self.runtime.push_work(connector_key); } } + }, + ConnectorScheduling::Exit => { + // TODO: Better way of doing this, when exiting then + // connected components must know their channels are invalid + self.runtime.destroy_component(connector_key); } } } @@ -283,12 +249,12 @@ impl Scheduler { // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); + let cur_connector = self.runtime.get_component_private(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector); - let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); + let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); + let new_connector = self.runtime.get_component_private(&new_key); // Call above changed ownership of ports, but we still have to // let the other end of the channel know that the port has @@ -303,13 +269,17 @@ impl Scheduler { } // Schedule new connector to run - self.runtime.global_store.connector_queue.push_back(new_key); + self.runtime.push_work(new_key); } } + + debug_assert!(delta_state.outbox.is_empty()); + debug_assert!(delta_state.new_ports.is_empty()); + debug_assert!(delta_state.new_connectors.is_empty()); } fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.runtime.global_store.connectors.get_shared(connector_id); + let connector = self.runtime.get_component_public(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping @@ -318,7 +288,7 @@ impl Scheduler { if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.runtime.global_store.connector_queue.push_back(key); + self.runtime.push_work(key); } } }