// Structure of module mod branch; mod native; mod port; mod scheduler; mod consensus; mod inbox; #[cfg(test)] mod tests; mod connector; // Imports use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; use crate::collections::RawVec; use crate::ProtocolDescription; use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; use inbox::Message; use port::{ChannelId, Port, PortState}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. #[derive(Debug)] pub(crate) struct ConnectorKey { pub index: u32, // of connector pub generation: u32, } impl ConnectorKey { /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable /// access, to a "regular ID" which can be used to obtain immutable access. #[inline] pub fn downcast(&self) -> ConnectorId { return ConnectorId{ index: self.index, generation: self.generation, }; } /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system #[inline] pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { return ConnectorKey{ index: id.index, generation: id.generation, }; } } /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Debug, Copy, Clone)] pub struct ConnectorId{ pub index: u32, pub generation: u32, } impl PartialEq for ConnectorId { fn eq(&self, other: &Self) -> bool { return self.index.eq(&other.index); } } impl Eq for ConnectorId{} impl PartialOrd for ConnectorId{ fn partial_cmp(&self, other: &Self) -> Option { return self.index.partial_cmp(&other.index) } } impl Ord for ConnectorId{ fn cmp(&self, other: &Self) -> std::cmp::Ordering { return self.partial_cmp(other).unwrap(); } } impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove #[inline] pub fn new_invalid() -> ConnectorId { return ConnectorId { index: u32::MAX, generation: 0, }; } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.index != u32::MAX; } } // TODO: Change this, I hate this. But I also don't want to put `public` and // `router` of `ScheduledConnector` back into `Connector`. The reason I don't // want `Box` everywhere is because of the v-table overhead. But // to truly design this properly I need some benchmarks. pub(crate) enum ConnectorVariant { UserDefined(ConnectorPDL), Native(Box), } impl Connector for ConnectorVariant { fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { match self { ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), } } } pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub ctx: ComponentCtx, pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: ControlMessageHandler, pub shutting_down: bool, } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- /// Externally facing runtime. pub struct Runtime { inner: Arc, } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ protocol_description, port_counter: AtomicU32::new(0), connectors: RwLock::new(ConnectorStore::with_capacity(32)), connector_queue: Mutex::new(VecDeque::with_capacity(32)), schedulers: Mutex::new(Vec::new()), scheduler_notifier: Condvar::new(), active_connectors: AtomicU32::new(0), active_interfaces: AtomicU32::new(1), // this `Runtime` instance should_exit: AtomicBool::new(false), }); // Launch threads { let mut schedulers = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); let thread = thread::Builder::new() .name(format!("thread-{}", thread_index)) .spawn(move || { let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); scheduler.run(); }) .unwrap(); schedulers.push(thread); } let mut lock = runtime_inner.schedulers.lock().unwrap(); *lock = schedulers; } // Return runtime return Runtime{ inner: runtime_inner }; } /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { self.inner.increment_active_interfaces(); let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); let connector_key = self.inner.create_interface_component(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case // it is actually needed. return interface; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_interfaces(); let mut lock = self.inner.schedulers.lock().unwrap(); for handle in lock.drain(..) { handle.join().unwrap(); } } } // ----------------------------------------------------------------------------- // RuntimeInner // ----------------------------------------------------------------------------- pub(crate) struct RuntimeInner { // Protocol pub(crate) protocol_description: ProtocolDescription, // Regular counter for port IDs port_counter: AtomicU32, // Storage of connectors and the work queue connectors: RwLock, connector_queue: Mutex>, schedulers: Mutex>>, // Conditions to determine whether the runtime can exit scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. // TODO: Figure out if we can simply merge the counters? active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels should_exit: AtomicBool, } impl RuntimeInner { // --- Managing the components queued for execution /// Wait until there is a connector to run. If there is one, then `Some` /// will be returned. If there is no more work, then `None` will be /// returned. pub(crate) fn wait_for_work(&self) -> Option { let mut lock = self.connector_queue.lock().unwrap(); while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { lock = self.scheduler_notifier.wait(lock).unwrap(); } return lock.pop_front(); } pub(crate) fn push_work(&self, key: ConnectorKey) { let mut lock = self.connector_queue.lock().unwrap(); lock.push_back(key); self.scheduler_notifier.notify_one(); } // --- Creating/using ports /// Creates a new port pair. Note that these are stored globally like the /// connectors are. Ports stored by components belong to those components. pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { use port::{PortIdLocal, PortKind}; let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); let channel_id = ChannelId::new(getter_id); let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); let getter_port = Port{ self_id: getter_id, peer_id: putter_id, channel_id, kind: PortKind::Getter, state: PortState::Open, peer_connector: creating_connector, }; let putter_port = Port{ self_id: putter_id, peer_id: getter_id, channel_id, kind: PortKind::Putter, state: PortState::Open, peer_connector: creating_connector, }; return (getter_port, putter_port); } /// Sends a message directly (without going through the port) to a /// component. This is slightly less efficient then sending over a port, but /// might be preferable for some algorithms. If the component was sleeping /// then it is scheduled for execution. pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { let target = { let mut lock = self.connectors.read().unwrap(); lock.get(target_id.index) }; // Do a CAS on the number of users. Most common case the component is // alive and we're the only one sending the message. Note that if we // finish this block, we're sure that no-one has set the `num_users` // value to 0. This is essential! When at 0, the component is added to // the freelist and the generation counter will be incremented. let mut cur_num_users = 1; while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { if old_num_users == 0 { // Cannot send message. Whatever the component state is // (destroyed, at a different generation number, busy being // destroyed, etc.) we cannot send the message and will not // modify the component return false; } cur_num_users = old_num_users; } // We incremented the counter. But we might still be at the wrong // generation number. The generation number is a monotonically // increasing value. Since it only increases when someone gets the // `num_users` counter to 0, we can simply load the generation number. let generation = target.generation.load(Ordering::Acquire); if generation != target_id.generation { // We're at the wrong generation, so we cannot send the message. // However, since we incremented the `num_users` counter, the moment // we decrement it we might be the one that are supposed to handle // the destruction of the component. Note that all users of the // component do an increment-followed-by-decrement, we can simply // do a `fetch_sub`. let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); if old_num_users == 1 { // We're the one that got the counter to 0, so we're the ones // that are supposed to handle component exit self.finish_component_destruction(target_id); } return false; } // The generation is correct, and since we incremented the `num_users` // counter we're now sure that we can send the message and it will be // handled by the receiver target.connector.public.inbox.insert_message(message); // Finally, do the same as above: decrement number of users, if at gets // to 0 we're the ones who should handle the exit condition. let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); if old_num_users == 1 { // We're allowed to destroy the component. self.finish_component_destruction(target_id); } else { // Message is sent. If the component is sleeping, then we're sure // it is not scheduled and it has not initiated the destruction of // the component (because of the way // `initiate_component_destruction` does not set sleeping to true). // So we can safely schedule it. let should_wake_up = target.connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe{ ConnectorKey::from_id(target_id) }; self.push_work(key); } } return true } /// Sends a message to a particular component, assumed to occur over a port. /// If the component happened to be sleeping then it will be scheduled for /// execution. Because of the port management system we may assumed that /// we're always accessing the component at the right generation number. pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { let target = { let lock = self.connectors.read().unwrap(); let entry = lock.get(target_id.index); debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); &mut entry.connector.public }; target.inbox.insert_message(message); let should_wake_up = target.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe{ ConnectorKey::from_id(target_id) }; self.push_work(key); } } // --- Creating/retrieving/destroying components /// Creates an initially sleeping application connector. fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { // Initialize as sleeping, as it will be scheduled by the programmer. let mut lock = self.connectors.write().unwrap(); let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); self.increment_active_components(); return key; } /// Creates a new PDL component. This function just creates the component. /// If you create it initially awake, then you must add it to the work /// queue. Other aspects of correctness (i.e. setting initial ports) are /// relinquished to the caller! pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { // Create as not sleeping, as we'll schedule it immediately let key = { let mut lock = self.connectors.write().unwrap(); lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) }; self.increment_active_components(); return key; } /// Retrieve private access to the component through its key. #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { let entry = { let lock = self.connectors.read().unwrap(); lock.get(connector_key.index) }; debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); return &mut entry.connector; } // --- Managing component destruction /// Start component destruction, may only be done by the scheduler that is /// executing the component. This might not actually destroy the component, /// since other components might be sending it messages. fn initiate_component_destruction(&self, connector_key: ConnectorKey) { // Most of the time no-one will be sending messages, so try // immediate destruction let mut lock = self.connectors.write().unwrap(); let entry = lock.get(connector_key.index); debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); if old_num_users == 1 { // We just brought the number of users down to 0. Destroy the // component entry.connector.public.inbox.clear(); entry.generation.fetch_add(1, Ordering::SeqCst); lock.destroy(connector_key); self.decrement_active_components(); } } fn finish_component_destruction(&self, connector_id: ConnectorId) { let mut lock = self.connectors.write().unwrap(); let entry = lock.get(connector_id.index); debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); debug_assert_eq!(_old_generation, connector_id.generation); // TODO: In the future we should not only clear out the inbox, but send // messages back to the senders indicating the messages did not arrive. entry.connector.public.inbox.clear(); // Invariant of only one thread being able to handle the internals of // component is preserved by the fact that only one thread can decrement // `num_users` to 0. lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); self.decrement_active_components(); } // --- Managing exit condition #[inline] pub(crate) fn increment_active_interfaces(&self) { let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero } pub(crate) fn decrement_active_interfaces(&self) { let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); debug_assert!(old_num > 0); if old_num == 1 { // such that active interfaces is now 0 let num_connectors = self.active_connectors.load(Ordering::Acquire); if num_connectors == 0 { self.signal_for_shutdown(); } } } #[inline] fn increment_active_components(&self) { let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); } fn decrement_active_components(&self) { let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); debug_assert!(old_num > 0); if old_num == 1 { // such that we have no more active connectors (for now!) let num_interfaces = self.active_interfaces.load(Ordering::Acquire); if num_interfaces == 0 { self.signal_for_shutdown(); } } } #[inline] fn signal_for_shutdown(&self) { debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); let _lock = self.connector_queue.lock().unwrap(); let should_signal = self.should_exit .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_signal { self.scheduler_notifier.notify_all(); } } } unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} // ----------------------------------------------------------------------------- // ConnectorStore // ----------------------------------------------------------------------------- struct StoreEntry { connector: ScheduledConnector, generation: std::sync::atomic::AtomicU32, num_users: std::sync::atomic::AtomicU32, } struct ConnectorStore { // Freelist storage of connectors. Storage should be pointer-stable as // someone might be mutating the vector while we're executing one of the // connectors. entries: RawVec<*mut StoreEntry>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { Self { entries: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), } } /// Directly retrieves an entry. There be dragons here. The `connector` /// might have its destructor already executed. Accessing it might then lead /// to memory corruption. fn get(&self, index: u32) -> &'static mut StoreEntry { unsafe { let entry = self.entries.get_mut(index as usize); return &mut **entry; } } /// Creates a new connector. Caller should ensure ports are set up correctly /// and the connector is queued for execution if needed. fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { let mut connector = ScheduledConnector { connector, ctx: ComponentCtx::new_empty(), public: ConnectorPublic::new(initially_sleeping), router: ControlMessageHandler::new(), shutting_down: false, }; let index; let key; if self.free.is_empty() { // No free entries, allocate new entry index = self.entries.len(); key = ConnectorKey{ index: index as u32, generation: 0 }; connector.ctx.id = key.downcast(); let connector = Box::into_raw(Box::new(StoreEntry{ connector, generation: AtomicU32::new(0), num_users: AtomicU32::new(1), })); self.entries.push(connector); } else { // Free spot available index = self.free.pop().unwrap(); unsafe { let target = &mut **self.entries.get_mut(index); std::ptr::write(&mut target.connector as *mut _, connector); let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); debug_assert_eq!(_old_num_users, 0); let generation = target.generation.load(Ordering::Acquire); key = ConnectorKey{ index: index as u32, generation }; target.connector.ctx.id = key.downcast(); } } println!("DEBUG [ global store ] Created component at {}", key.index); return key; } /// Destroys a connector. Caller should make sure it is not scheduled for /// execution. Otherwise one experiences "bad stuff" (tm). fn destroy(&mut self, key: ConnectorKey) { unsafe { let target = self.entries.get_mut(key.index as usize); (**target).generation.fetch_add(1, Ordering::SeqCst); std::ptr::drop_in_place(*target); // Note: but not deallocating! } println!("DEBUG [ global store ] Destroyed component at {}", key.index); self.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { // Everything in the freelist already had its destructor called, so only // has to be deallocated for free_idx in self.free.iter().copied() { unsafe { let memory = self.entries.get_mut(free_idx); let layout = std::alloc::Layout::for_value(&**memory); std::alloc::dealloc(*memory as *mut u8, layout); // mark as null for the remainder *memory = std::ptr::null_mut(); } } // With the deallocated stuff marked as null, clear the remainder that // is not null for idx in 0..self.entries.len() { unsafe { let memory = *self.entries.get_mut(idx); if !memory.is_null() { let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh } } } } }