diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs deleted file mode 100644 index de82384c6a9ed35bbe0359ade150be249034e3e0..0000000000000000000000000000000000000000 --- a/src/runtime2/mod.rs +++ /dev/null @@ -1,640 +0,0 @@ -// Structure of module - -mod branch; -mod native; -mod port; -mod scheduler; -mod consensus; -mod inbox; - -#[cfg(test)] mod tests; -mod connector; - -// Imports - -use std::collections::VecDeque; -use std::sync::{Arc, Condvar, Mutex, RwLock}; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::thread::{self, JoinHandle}; - -use crate::collections::RawVec; -use crate::ProtocolDescription; - -use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; -use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; -use native::{Connector, ConnectorApplication, ApplicationInterface}; -use inbox::Message; -use port::{ChannelId, Port, PortState}; - -/// A kind of token that, once obtained, allows mutable access to a connector. -/// We're trying to use move semantics as much as possible: the owner of this -/// key is the only one that may execute the connector's code. -#[derive(Debug)] -pub(crate) struct ConnectorKey { - pub index: u32, // of connector - pub generation: u32, -} - -impl ConnectorKey { - /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable - /// access, to a "regular ID" which can be used to obtain immutable access. - #[inline] - pub fn downcast(&self) -> ConnectorId { - return ConnectorId{ - index: self.index, - generation: self.generation, - }; - } - - /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it - /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system - #[inline] - pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { - return ConnectorKey{ - index: id.index, - generation: id.generation, - }; - } -} - -/// A kind of token that allows shared access to a connector. Multiple threads -/// may hold this -#[derive(Debug, Copy, Clone)] -pub struct ConnectorId{ - pub index: u32, - pub generation: u32, -} - -impl PartialEq for ConnectorId { - fn eq(&self, other: &Self) -> bool { - return self.index.eq(&other.index); - } -} - -impl Eq for ConnectorId{} - -impl PartialOrd for ConnectorId{ - fn partial_cmp(&self, other: &Self) -> Option { - return self.index.partial_cmp(&other.index) - } -} - -impl Ord for ConnectorId{ - fn cmp(&self, other: &Self) -> crate::common::Ordering { - return self.partial_cmp(other).unwrap(); - } -} - -impl ConnectorId { - // TODO: Like the other `new_invalid`, maybe remove - #[inline] - pub fn new_invalid() -> ConnectorId { - return ConnectorId { - index: u32::MAX, - generation: 0, - }; - } - - #[inline] - pub(crate) fn is_valid(&self) -> bool { - return self.index != u32::MAX; - } -} - -// TODO: Change this, I hate this. But I also don't want to put `public` and -// `router` of `ScheduledConnector` back into `Connector`. The reason I don't -// want `Box` everywhere is because of the v-table overhead. But -// to truly design this properly I need some benchmarks. -pub(crate) enum ConnectorVariant { - UserDefined(ConnectorPDL), - Native(Box), -} - -impl Connector for ConnectorVariant { - fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), - ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), - } - } -} - -pub(crate) struct ScheduledConnector { - pub connector: ConnectorVariant, // access by connector - pub ctx: ComponentCtx, - pub public: ConnectorPublic, // accessible by all schedulers and connectors - pub router: ControlMessageHandler, - pub shutting_down: bool, -} - -// ----------------------------------------------------------------------------- -// Runtime -// ----------------------------------------------------------------------------- - -/// Externally facing runtime. -pub struct Runtime { - inner: Arc, -} - -impl Runtime { - pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { - // Setup global state - assert!(num_threads > 0, "need a thread to run connectors"); - let runtime_inner = Arc::new(RuntimeInner{ - protocol_description, - port_counter: AtomicU32::new(0), - connectors: RwLock::new(ConnectorStore::with_capacity(32)), - connector_queue: Mutex::new(VecDeque::with_capacity(32)), - schedulers: Mutex::new(Vec::new()), - scheduler_notifier: Condvar::new(), - active_connectors: AtomicU32::new(0), - active_interfaces: AtomicU32::new(1), // this `Runtime` instance - should_exit: AtomicBool::new(false), - }); - - // Launch threads - { - let mut schedulers = Vec::with_capacity(num_threads as usize); - for thread_index in 0..num_threads { - let cloned_runtime_inner = runtime_inner.clone(); - let thread = thread::Builder::new() - .name(format!("thread-{}", thread_index)) - .spawn(move || { - let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); - scheduler.run(); - }) - .unwrap(); - - schedulers.push(thread); - } - - let mut lock = runtime_inner.schedulers.lock().unwrap(); - *lock = schedulers; - } - - // Return runtime - return Runtime{ inner: runtime_inner }; - } - - /// Returns a new interface through which channels and connectors can be - /// created. - pub fn create_interface(&self) -> ApplicationInterface { - self.inner.increment_active_interfaces(); - let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); - let connector_key = self.inner.create_interface_component(connector); - interface.set_connector_id(connector_key.downcast()); - - // Note that we're not scheduling. That is done by the interface in case - // it is actually needed. - return interface; - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - self.inner.decrement_active_interfaces(); - let mut lock = self.inner.schedulers.lock().unwrap(); - for handle in lock.drain(..) { - handle.join().unwrap(); - } - } -} - -// ----------------------------------------------------------------------------- -// RuntimeInner -// ----------------------------------------------------------------------------- - -pub(crate) struct RuntimeInner { - // Protocol - pub(crate) protocol_description: ProtocolDescription, - // Regular counter for port IDs - port_counter: AtomicU32, - // Storage of connectors and the work queue - connectors: RwLock, - connector_queue: Mutex>, - schedulers: Mutex>>, - // Conditions to determine whether the runtime can exit - scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. - // TODO: Figure out if we can simply merge the counters? - active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) - active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels - should_exit: AtomicBool, -} - -impl RuntimeInner { - // --- Managing the components queued for execution - - /// Wait until there is a connector to run. If there is one, then `Some` - /// will be returned. If there is no more work, then `None` will be - /// returned. - pub(crate) fn wait_for_work(&self) -> Option { - let mut lock = self.connector_queue.lock().unwrap(); - while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { - lock = self.scheduler_notifier.wait(lock).unwrap(); - } - - return lock.pop_front(); - } - - pub(crate) fn push_work(&self, key: ConnectorKey) { - let mut lock = self.connector_queue.lock().unwrap(); - lock.push_back(key); - self.scheduler_notifier.notify_one(); - } - - // --- Creating/using ports - - /// Creates a new port pair. Note that these are stored globally like the - /// connectors are. Ports stored by components belong to those components. - pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { - use port::{PortIdLocal, PortKind}; - - let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); - let channel_id = ChannelId::new(getter_id); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - let getter_port = Port{ - self_id: getter_id, - peer_id: putter_id, - channel_id, - kind: PortKind::Getter, - state: PortState::Open, - peer_connector: creating_connector, - }; - let putter_port = Port{ - self_id: putter_id, - peer_id: getter_id, - channel_id, - kind: PortKind::Putter, - state: PortState::Open, - peer_connector: creating_connector, - }; - - return (getter_port, putter_port); - } - - /// Sends a message directly (without going through the port) to a - /// component. This is slightly less efficient then sending over a port, but - /// might be preferable for some algorithms. If the component was sleeping - /// then it is scheduled for execution. - pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { - let target = { - let mut lock = self.connectors.read().unwrap(); - lock.get(target_id.index) - }; - - // Do a CAS on the number of users. Most common case the component is - // alive and we're the only one sending the message. Note that if we - // finish this block, we're sure that no-one has set the `num_users` - // value to 0. This is essential! When at 0, the component is added to - // the freelist and the generation counter will be incremented. - let mut cur_num_users = 1; - while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { - if old_num_users == 0 { - // Cannot send message. Whatever the component state is - // (destroyed, at a different generation number, busy being - // destroyed, etc.) we cannot send the message and will not - // modify the component - return false; - } - - cur_num_users = old_num_users; - } - - // We incremented the counter. But we might still be at the wrong - // generation number. The generation number is a monotonically - // increasing value. Since it only increases when someone gets the - // `num_users` counter to 0, we can simply load the generation number. - let generation = target.generation.load(Ordering::Acquire); - if generation != target_id.generation { - // We're at the wrong generation, so we cannot send the message. - // However, since we incremented the `num_users` counter, the moment - // we decrement it we might be the one that are supposed to handle - // the destruction of the component. Note that all users of the - // component do an increment-followed-by-decrement, we can simply - // do a `fetch_sub`. - let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We're the one that got the counter to 0, so we're the ones - // that are supposed to handle component exit - self.finish_component_destruction(target_id); - } - - return false; - } - - // The generation is correct, and since we incremented the `num_users` - // counter we're now sure that we can send the message and it will be - // handled by the receiver - target.connector.public.inbox.insert_message(message); - - // Finally, do the same as above: decrement number of users, if at gets - // to 0 we're the ones who should handle the exit condition. - let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We're allowed to destroy the component. - self.finish_component_destruction(target_id); - } else { - // Message is sent. If the component is sleeping, then we're sure - // it is not scheduled and it has not initiated the destruction of - // the component (because of the way - // `initiate_component_destruction` does not set sleeping to true). - // So we can safely schedule it. - let should_wake_up = target.connector.public.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let key = unsafe{ ConnectorKey::from_id(target_id) }; - self.push_work(key); - } - } - - return true - } - - /// Sends a message to a particular component, assumed to occur over a port. - /// If the component happened to be sleeping then it will be scheduled for - /// execution. Because of the port management system we may assumed that - /// we're always accessing the component at the right generation number. - pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { - let target = { - let lock = self.connectors.read().unwrap(); - let entry = lock.get(target_id.index); - debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); - &mut entry.connector.public - }; - - target.inbox.insert_message(message); - - let should_wake_up = target.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let key = unsafe{ ConnectorKey::from_id(target_id) }; - self.push_work(key); - } - } - - // --- Creating/retrieving/destroying components - - /// Creates an initially sleeping application connector. - fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { - // Initialize as sleeping, as it will be scheduled by the programmer. - let mut lock = self.connectors.write().unwrap(); - let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); - - self.increment_active_components(); - return key; - } - - /// Creates a new PDL component. This function just creates the component. - /// If you create it initially awake, then you must add it to the work - /// queue. Other aspects of correctness (i.e. setting initial ports) are - /// relinquished to the caller! - pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { - // Create as not sleeping, as we'll schedule it immediately - let key = { - let mut lock = self.connectors.write().unwrap(); - lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) - }; - - self.increment_active_components(); - return key; - } - - /// Retrieve private access to the component through its key. - #[inline] - pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { - let entry = { - let lock = self.connectors.read().unwrap(); - lock.get(connector_key.index) - }; - - debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); - return &mut entry.connector; - } - - // --- Managing component destruction - - /// Start component destruction, may only be done by the scheduler that is - /// executing the component. This might not actually destroy the component, - /// since other components might be sending it messages. - fn initiate_component_destruction(&self, connector_key: ConnectorKey) { - // Most of the time no-one will be sending messages, so try - // immediate destruction - let mut lock = self.connectors.write().unwrap(); - let entry = lock.get(connector_key.index); - debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); - debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component - let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We just brought the number of users down to 0. Destroy the - // component - entry.connector.public.inbox.clear(); - entry.generation.fetch_add(1, Ordering::SeqCst); - lock.destroy(connector_key); - self.decrement_active_components(); - } - } - - fn finish_component_destruction(&self, connector_id: ConnectorId) { - let mut lock = self.connectors.write().unwrap(); - let entry = lock.get(connector_id.index); - debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); - let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); - debug_assert_eq!(_old_generation, connector_id.generation); - - // TODO: In the future we should not only clear out the inbox, but send - // messages back to the senders indicating the messages did not arrive. - entry.connector.public.inbox.clear(); - - // Invariant of only one thread being able to handle the internals of - // component is preserved by the fact that only one thread can decrement - // `num_users` to 0. - lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); - self.decrement_active_components(); - } - - // --- Managing exit condition - - #[inline] - pub(crate) fn increment_active_interfaces(&self) { - let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); - debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero - } - - pub(crate) fn decrement_active_interfaces(&self) { - let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { // such that active interfaces is now 0 - let num_connectors = self.active_connectors.load(Ordering::Acquire); - if num_connectors == 0 { - self.signal_for_shutdown(); - } - } - } - - #[inline] - fn increment_active_components(&self) { - let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); - } - - fn decrement_active_components(&self) { - let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { // such that we have no more active connectors (for now!) - let num_interfaces = self.active_interfaces.load(Ordering::Acquire); - if num_interfaces == 0 { - self.signal_for_shutdown(); - } - } - } - - #[inline] - fn signal_for_shutdown(&self) { - debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); - debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); - - let _lock = self.connector_queue.lock().unwrap(); - let should_signal = self.should_exit - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_signal { - self.scheduler_notifier.notify_all(); - } - } -} - -unsafe impl Send for RuntimeInner {} -unsafe impl Sync for RuntimeInner {} - -// ----------------------------------------------------------------------------- -// ConnectorStore -// ----------------------------------------------------------------------------- - -struct StoreEntry { - connector: ScheduledConnector, - generation: std::sync::atomic::AtomicU32, - num_users: std::sync::atomic::AtomicU32, -} - -struct ConnectorStore { - // Freelist storage of connectors. Storage should be pointer-stable as - // someone might be mutating the vector while we're executing one of the - // connectors. - entries: RawVec<*mut StoreEntry>, - free: Vec, -} - -impl ConnectorStore { - fn with_capacity(capacity: usize) -> Self { - Self { - entries: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), - } - } - - /// Directly retrieves an entry. There be dragons here. The `connector` - /// might have its destructor already executed. Accessing it might then lead - /// to memory corruption. - fn get(&self, index: u32) -> &'static mut StoreEntry { - unsafe { - let entry = self.entries.get_mut(index as usize); - return &mut **entry; - } - } - - /// Creates a new connector. Caller should ensure ports are set up correctly - /// and the connector is queued for execution if needed. - fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { - let mut connector = ScheduledConnector { - connector, - ctx: ComponentCtx::new_empty(), - public: ConnectorPublic::new(initially_sleeping), - router: ControlMessageHandler::new(), - shutting_down: false, - }; - - let index; - let key; - - if self.free.is_empty() { - // No free entries, allocate new entry - index = self.entries.len(); - key = ConnectorKey{ - index: index as u32, generation: 0 - }; - connector.ctx.id = key.downcast(); - - let connector = Box::into_raw(Box::new(StoreEntry{ - connector, - generation: AtomicU32::new(0), - num_users: AtomicU32::new(1), - })); - self.entries.push(connector); - } else { - // Free spot available - index = self.free.pop().unwrap(); - - unsafe { - let target = &mut **self.entries.get_mut(index); - std::ptr::write(&mut target.connector as *mut _, connector); - let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); - debug_assert_eq!(_old_num_users, 0); - - let generation = target.generation.load(Ordering::Acquire); - key = ConnectorKey{ index: index as u32, generation }; - target.connector.ctx.id = key.downcast(); - } - } - - println!("DEBUG [ global store ] Created component at {}", key.index); - return key; - } - - /// Destroys a connector. Caller should make sure it is not scheduled for - /// execution. Otherwise one experiences "bad stuff" (tm). - fn destroy(&mut self, key: ConnectorKey) { - unsafe { - let target = self.entries.get_mut(key.index as usize); - (**target).generation.fetch_add(1, Ordering::SeqCst); - std::ptr::drop_in_place(*target); - // Note: but not deallocating! - } - - println!("DEBUG [ global store ] Destroyed component at {}", key.index); - self.free.push(key.index as usize); - } -} - -impl Drop for ConnectorStore { - fn drop(&mut self) { - // Everything in the freelist already had its destructor called, so only - // has to be deallocated - for free_idx in self.free.iter().copied() { - unsafe { - let memory = self.entries.get_mut(free_idx); - let layout = std::alloc::Layout::for_value(&**memory); - std::alloc::dealloc(*memory as *mut u8, layout); - - // mark as null for the remainder - *memory = std::ptr::null_mut(); - } - } - - // With the deallocated stuff marked as null, clear the remainder that - // is not null - for idx in 0..self.entries.len() { - unsafe { - let memory = *self.entries.get_mut(idx); - if !memory.is_null() { - let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh - } - } - } - } -} \ No newline at end of file