diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index b1acb13a1fd227c022ce0788916b0ed1b7663c23..5238d2ca54a24635e0ad21cfc077131084f531f1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -5,7 +5,6 @@ mod messages; mod connector; mod native; mod port; -mod global_store; mod scheduler; mod inbox; @@ -13,16 +12,18 @@ mod inbox; // Imports -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::VecDeque; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; +use crate::collections::RawVec; use crate::ProtocolDescription; -use global_store::{ConnectorVariant, GlobalStore}; -use scheduler::Scheduler; -use native::{ConnectorApplication, ApplicationInterface}; - +use inbox::Message; +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use scheduler::{Scheduler, ConnectorCtx, Router}; +use native::{Connector, ConnectorApplication, ApplicationInterface}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -97,55 +98,29 @@ pub(crate) struct ScheduledConnector { pub router: Router, } +// ----------------------------------------------------------------------------- +// Runtime +// ----------------------------------------------------------------------------- + /// Externally facing runtime. pub struct Runtime { inner: Arc, } -pub(crate) struct RuntimeInner { - // Protocol - pub(crate) protocol_description: ProtocolDescription, - // Storage of connectors in a kind of freelist. Note the vector of points to - // ensure pointer stability: the vector can be changed but the entries - // themselves remain valid. - pub connectors_list: RawVec<*mut ScheduledConnector>, - pub connectors_free: Vec, - - pub(crate) global_store: GlobalStore, - schedulers: Mutex>>, - active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels -} - -impl RuntimeInner { - #[inline] - pub(crate) fn increment_active_interfaces(&self) { - let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); - debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero - } - - pub(crate) fn decrement_active_interfaces(&self) { - let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { - // Became 0 - // TODO: Check num connectors, if 0, then set exit flag - } - } -} - -// TODO: Come back to this at some point -unsafe impl Send for RuntimeInner {} -unsafe impl Sync for RuntimeInner {} - impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ - global_store: GlobalStore::new(), protocol_description, + port_counter: AtomicU32::new(0), + connectors: RwLock::new(ConnectorStore::with_capacity(32)), + connector_queue: Mutex::new(VecDeque::with_capacity(32)), schedulers: Mutex::new(Vec::new()), - active_interfaces: AtomicU32::new(1), // we are the active interface + scheduler_notifier: Condvar::new(), + active_connectors: AtomicU32::new(0), + active_interfaces: AtomicU32::new(1), // this `Runtime` instance + should_exit: AtomicBool::new(false), }); // Launch threads @@ -175,8 +150,9 @@ impl Runtime { /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { + self.inner.increment_active_interfaces(); let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); - let connector_key = self.inner.global_store.connectors.create_interface(connector); + let connector_key = self.inner.create_interface_component(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case @@ -187,10 +163,283 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { - self.inner.global_store.should_exit.store(true, Ordering::Release); - let mut schedulers = self.inner.schedulers.lock().unwrap(); - for scheduler in schedulers.drain(..) { - scheduler.join().unwrap(); + self.inner.decrement_active_interfaces(); + let mut lock = self.inner.schedulers.lock().unwrap(); + for handle in lock.drain(..) { + handle.join().unwrap(); + } + } +} + +// ----------------------------------------------------------------------------- +// RuntimeInner +// ----------------------------------------------------------------------------- + +pub(crate) struct RuntimeInner { + // Protocol + pub(crate) protocol_description: ProtocolDescription, + // Regular counter for port IDs + port_counter: AtomicU32, + // Storage of connectors and the work queue + connectors: RwLock, + connector_queue: Mutex>, + schedulers: Mutex>>, + // Conditions to determine whether the runtime can exit + scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. + // TODO: Figure out if we can simply merge the counters? + active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels + should_exit: AtomicBool, +} + +impl RuntimeInner { + // --- Managing the components queued for execution + + /// Wait until there is a connector to run. If there is one, then `Some` + /// will be returned. If there is no more work, then `None` will be + /// returned. + pub(crate) fn wait_for_work(&self) -> Option { + let mut lock = self.connector_queue.lock().unwrap(); + while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { + lock = self.scheduler_notifier.wait(lock).unwrap(); + } + + return lock.pop_front(); + } + + pub(crate) fn push_work(&self, key: ConnectorKey) { + let mut lock = self.connector_queue.lock().unwrap(); + lock.push_back(key); + self.scheduler_notifier.notify_one(); + } + + // --- Creating/retrieving/destroying components + + pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { + // Initialize as sleeping, as it will be scheduled by the programmer. + let mut lock = self.connectors.write().unwrap(); + let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); + + self.increment_active_components(); + return key; + } + + /// Creates a new PDL component. The caller MUST make sure to schedule the + /// connector. + // TODO: Nicer code, not forcing the caller to schedule, perhaps? + pub(crate) fn create_pdl_component(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { + // Create as not sleeping, as we'll schedule it immediately + let key = { + let mut lock = self.connectors.write().unwrap(); + lock.create(ConnectorVariant::UserDefined(connector), true) + }; + + // Transfer the ports + { + let lock = self.connectors.read().unwrap(); + let created = lock.get_private(&key); + + match &created.connector { + ConnectorVariant::UserDefined(connector) => { + for port_id in connector.ports.owned_ports.iter().copied() { + println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index); + let mut port = created_by.context.remove_port(port_id); + created.context.add_port(port); + } + }, + ConnectorVariant::Native(_) => unreachable!(), + } + } + + self.increment_active_components(); + return key; + } + + pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { + let lock = self.connectors.read().unwrap(); + return lock.get_private(connector_key); + } + + pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { + let lock = self.connectors.read().unwrap(); + return lock.get_public(connector_id); + } + + pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) { + let mut lock = self.connectors.write().unwrap(); + lock.destroy(connector_key); + self.decrement_active_components(); + } + + // --- Managing exit condition + + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that active interfaces is now 0 + let num_connectors = self.active_connectors.load(Ordering::Acquire); + if num_connectors == 0 { + self.signal_for_shutdown(); + } + } + } + + #[inline] + fn increment_active_components(&self) { + self.active_connectors.fetch_add(1, Ordering::SeqCst); + } + + fn decrement_active_components(&self) { + let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 0 { // such that we have no more active connectors (for now!) + let num_interfaces = self.active_interfaces.load(Ordering::Acquire); + if num_interfaces == 0 { + self.signal_for_shutdown(); + } + } + } + + #[inline] + fn signal_for_shutdown(&self) { + debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); + debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); + + println!("DEBUG: Signaling for shutdown"); + let _lock = self.connector_queue.lock().unwrap(); + let should_signal = self.should_exit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_signal { + println!("DEBUG: Notifying all waiting schedulers"); + self.scheduler_notifier.notify_all(); + } + } +} + +// TODO: Come back to this at some point +unsafe impl Send for RuntimeInner {} +unsafe impl Sync for RuntimeInner {} + +// ----------------------------------------------------------------------------- +// ConnectorStore +// ----------------------------------------------------------------------------- + +struct ConnectorStore { + // Freelist storage of connectors. Storage should be pointer-stable as + // someone might be mutating the vector while we're executing one of the + // connectors. + connectors: RawVec<*mut ScheduledConnector>, + free: Vec, +} + +impl ConnectorStore { + fn with_capacity(capacity: usize) -> Self { + Self { + connectors: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + } + } + + /// Retrieves public part of connector - accessible by many threads at once. + fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { + unsafe { + let connector = self.connectors.get(id.0 as usize); + debug_assert!(!connector.is_null()); + return &(**connector).public; + } + } + + /// Retrieves private part of connector - accessible by one thread at a + /// time. + fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { + unsafe { + let connector = self.connectors.get_mut(key.index as usize); + debug_assert!(!connector.is_null()); + return &mut (**connector); + } + } + + /// Creates a new connector. Caller should ensure ports are set up correctly + /// and the connector is queued for execution if needed. + fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { + let mut connector = ScheduledConnector { + connector, + context: ConnectorCtx::new(), + public: ConnectorPublic::new(initially_sleeping), + router: Router::new(), + }; + + let index; + let key; + + if self.free.is_empty() { + // No free entries, allocate new entry + index = self.connectors.len(); + key = ConnectorKey{ index: index as u32 }; + connector.context.id = key.downcast(); + + let connector = Box::into_raw(Box::new(connector)); + self.connectors.push(connector); + } else { + // Free spot available + index = self.free.pop().unwrap(); + key = ConnectorKey{ index: index as u32 }; + connector.context.id = key.downcast(); + + unsafe { + let target = self.connectors.get_mut(index); + std::ptr::write(*target, connector); + } + } + + return key; + } + + /// Destroys a connector. Caller should make sure it is not scheduled for + /// execution. Otherwise one experiences "bad stuff" (tm). + fn destroy(&mut self, key: ConnectorKey) { + unsafe { + let target = self.connectors.get_mut(key.index as usize); + std::ptr::drop_in_place(*target); + // Note: but not deallocating! + } + + self.free.push(key.index as usize); + } +} + +impl Drop for ConnectorStore { + fn drop(&mut self) { + // Everything in the freelist already had its destructor called, so only + // has to be deallocated + for free_idx in self.free.iter().copied() { + unsafe { + let memory = self.connectors.get_mut(free_idx); + let layout = std::alloc::Layout::for_value(&**memory); + std::alloc::dealloc(*memory as *mut u8, layout); + + // mark as null for the remainder + *memory = std::ptr::null_mut(); + } + } + + // With the deallocated stuff marked as null, clear the remainder that + // is not null + for idx in 0..self.connectors.len() { + unsafe { + let memory = *self.connectors.get_mut(idx); + if !memory.is_null() { + let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh + } + } } } } \ No newline at end of file