// Structure of module mod runtime; mod messages; mod connector; mod branch; mod native; mod port; mod scheduler; mod inbox; mod consensus; mod inbox2; #[cfg(test)] mod tests; mod connector2; // Imports use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; use crate::collections::RawVec; use crate::ProtocolDescription; use inbox::Message; use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; use scheduler::{Scheduler, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; use crate::runtime2::inbox2::MessageFancy; use crate::runtime2::port::{Port, PortState}; use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. pub(crate) struct ConnectorKey { pub index: u32, // of connector } impl ConnectorKey { /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable /// access, to a "regular ID" which can be used to obtain immutable access. #[inline] pub fn downcast(&self) -> ConnectorId { return ConnectorId(self.index); } /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system #[inline] pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { return ConnectorKey{ index: id.0 }; } } /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct ConnectorId(pub u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove #[inline] pub fn new_invalid() -> ConnectorId { return ConnectorId(u32::MAX); } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.0 != u32::MAX; } } // TODO: Change this, I hate this. But I also don't want to put `public` and // `router` of `ScheduledConnector` back into `Connector`. The reason I don't // want `Box` everywhere is because of the v-table overhead. But // to truly design this properly I need some benchmarks. pub(crate) enum ConnectorVariant { UserDefined(ConnectorPDL), Native(Box), } impl Connector for ConnectorVariant { fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { match self { ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), } } } pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub ctx_fancy: ComponentCtxFancy, pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: ControlMessageHandler, pub shutting_down: bool, } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- /// Externally facing runtime. pub struct Runtime { inner: Arc, } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ protocol_description, port_counter: AtomicU32::new(0), connectors: RwLock::new(ConnectorStore::with_capacity(32)), connector_queue: Mutex::new(VecDeque::with_capacity(32)), schedulers: Mutex::new(Vec::new()), scheduler_notifier: Condvar::new(), active_connectors: AtomicU32::new(0), active_interfaces: AtomicU32::new(1), // this `Runtime` instance should_exit: AtomicBool::new(false), }); // Launch threads { let mut schedulers = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); let thread = thread::Builder::new() .name(format!("thread-{}", thread_index)) .spawn(move || { let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); scheduler.run(); }) .unwrap(); schedulers.push(thread); } let mut lock = runtime_inner.schedulers.lock().unwrap(); *lock = schedulers; } // Return runtime return Runtime{ inner: runtime_inner }; } /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { self.inner.increment_active_interfaces(); let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); let connector_key = self.inner.create_interface_component(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case // it is actually needed. return interface; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_interfaces(); let mut lock = self.inner.schedulers.lock().unwrap(); for handle in lock.drain(..) { handle.join().unwrap(); } } } // ----------------------------------------------------------------------------- // RuntimeInner // ----------------------------------------------------------------------------- pub(crate) struct RuntimeInner { // Protocol pub(crate) protocol_description: ProtocolDescription, // Regular counter for port IDs port_counter: AtomicU32, // Storage of connectors and the work queue connectors: RwLock, connector_queue: Mutex>, schedulers: Mutex>>, // Conditions to determine whether the runtime can exit scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. // TODO: Figure out if we can simply merge the counters? active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels should_exit: AtomicBool, } impl RuntimeInner { // --- Managing the components queued for execution /// Wait until there is a connector to run. If there is one, then `Some` /// will be returned. If there is no more work, then `None` will be /// returned. pub(crate) fn wait_for_work(&self) -> Option { let mut lock = self.connector_queue.lock().unwrap(); while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { lock = self.scheduler_notifier.wait(lock).unwrap(); } return lock.pop_front(); } pub(crate) fn push_work(&self, key: ConnectorKey) { let mut lock = self.connector_queue.lock().unwrap(); lock.push_back(key); self.scheduler_notifier.notify_one(); } // --- Creating/using ports /// Creates a new port pair. Note that these are stored globally like the /// connectors are. Ports stored by components belong to those components. pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { use port::{PortIdLocal, PortKind}; let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); let getter_port = Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, state: PortState::Open, peer_connector: creating_connector, }; let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, state: PortState::Open, peer_connector: creating_connector, }; return (getter_port, putter_port); } /// Sends a message to a particular connector. If the connector happened to /// be sleeping then it will be scheduled for execution. pub(crate) fn send_message(&self, target_id: ConnectorId, message: MessageFancy) { let target = self.get_component_public(target_id); target.inbox.insert_message(message); let should_wake_up = target.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe{ ConnectorKey::from_id(target_id) }; self.push_work(key); } } // --- Creating/retrieving/destroying components /// Creates an initially sleeping application connector. fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { // Initialize as sleeping, as it will be scheduled by the programmer. let mut lock = self.connectors.write().unwrap(); let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); self.increment_active_components(); return key; } /// Creates a new PDL component. This function just creates the component. /// If you create it initially awake, then you must add it to the work /// queue. Other aspects of correctness (i.e. setting initial ports) are /// relinquished to the caller! pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { // Create as not sleeping, as we'll schedule it immediately let key = { let mut lock = self.connectors.write().unwrap(); lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) }; self.increment_active_components(); return key; } #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.connectors.read().unwrap(); return lock.get_private(connector_key); } #[inline] pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.connectors.read().unwrap(); return lock.get_public(connector_id); } pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) { let mut lock = self.connectors.write().unwrap(); lock.destroy(connector_key); self.decrement_active_components(); } // --- Managing exit condition #[inline] pub(crate) fn increment_active_interfaces(&self) { let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); println!("DEBUG: Incremented active interfaces to {}", _old_num + 1); debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero } pub(crate) fn decrement_active_interfaces(&self) { let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); println!("DEBUG: Decremented active interfaces to {}", old_num - 1); debug_assert!(old_num > 0); if old_num == 1 { // such that active interfaces is now 0 let num_connectors = self.active_connectors.load(Ordering::Acquire); if num_connectors == 0 { self.signal_for_shutdown(); } } } #[inline] fn increment_active_components(&self) { let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); println!("DEBUG: Incremented components to {}", _old_num + 1); } fn decrement_active_components(&self) { let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); println!("DEBUG: Decremented components to {}", old_num - 1); debug_assert!(old_num > 0); if old_num == 1 { // such that we have no more active connectors (for now!) let num_interfaces = self.active_interfaces.load(Ordering::Acquire); if num_interfaces == 0 { self.signal_for_shutdown(); } } } #[inline] fn signal_for_shutdown(&self) { debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); println!("DEBUG: Signaling for shutdown"); let _lock = self.connector_queue.lock().unwrap(); let should_signal = self.should_exit .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_signal { println!("DEBUG: Notifying all waiting schedulers"); self.scheduler_notifier.notify_all(); } } } // TODO: Come back to this at some point unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} // ----------------------------------------------------------------------------- // ConnectorStore // ----------------------------------------------------------------------------- struct ConnectorStore { // Freelist storage of connectors. Storage should be pointer-stable as // someone might be mutating the vector while we're executing one of the // connectors. connectors: RawVec<*mut ScheduledConnector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { Self { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), } } /// Retrieves public part of connector - accessible by many threads at once. fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic { unsafe { let connector = self.connectors.get(id.0 as usize); debug_assert!(!connector.is_null()); return &(**connector).public; } } /// Retrieves private part of connector - accessible by one thread at a /// time. fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { unsafe { let connector = self.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return &mut (**connector); } } /// Creates a new connector. Caller should ensure ports are set up correctly /// and the connector is queued for execution if needed. fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { let mut connector = ScheduledConnector { connector, ctx_fancy: ComponentCtxFancy::new_empty(), public: ConnectorPublic::new(initially_sleeping), router: ControlMessageHandler::new(), shutting_down: false, }; let index; let key; if self.free.is_empty() { // No free entries, allocate new entry index = self.connectors.len(); key = ConnectorKey{ index: index as u32 }; connector.ctx_fancy.id = key.downcast(); let connector = Box::into_raw(Box::new(connector)); self.connectors.push(connector); } else { // Free spot available index = self.free.pop().unwrap(); key = ConnectorKey{ index: index as u32 }; connector.ctx_fancy.id = key.downcast(); unsafe { let target = self.connectors.get_mut(index); std::ptr::write(*target, connector); } } return key; } /// Destroys a connector. Caller should make sure it is not scheduled for /// execution. Otherwise one experiences "bad stuff" (tm). fn destroy(&mut self, key: ConnectorKey) { unsafe { let target = self.connectors.get_mut(key.index as usize); std::ptr::drop_in_place(*target); // Note: but not deallocating! } self.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { // Everything in the freelist already had its destructor called, so only // has to be deallocated for free_idx in self.free.iter().copied() { unsafe { let memory = self.connectors.get_mut(free_idx); let layout = std::alloc::Layout::for_value(&**memory); std::alloc::dealloc(*memory as *mut u8, layout); // mark as null for the remainder *memory = std::ptr::null_mut(); } } // With the deallocated stuff marked as null, clear the remainder that // is not null for idx in 0..self.connectors.len() { unsafe { let memory = *self.connectors.get_mut(idx); if !memory.is_null() { let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh } } } } }