diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 846a60389f8b022adcb7810efa45f05effcc3a83..bf6a9baed2d0680b331672d0fcfd60f5328d2cbf 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -7,6 +7,7 @@ use crate::protocol::*; use super::communication::Message; use super::component::{CompCtx, CompPDL}; use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; +use super::scheduler::Scheduler; // ----------------------------------------------------------------------------- // Component @@ -122,30 +123,59 @@ impl Drop for CompHandle { // Runtime // ----------------------------------------------------------------------------- -pub type RuntimeHandle = Arc; - -/// Memory that is maintained by "the runtime". In practice it is maintained by -/// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { - pub protocol: ProtocolDescription, - components: ComponentStore, - work_queue: Mutex>, - work_condvar: Condvar, - active_elements: AtomicU32, // active components and APIs (i.e. component creators) + pub(crate) inner: Arc, + threads: Vec>, } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); - return Runtime{ + let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), - active_elements: AtomicU32::new(0), + active_elements: AtomicU32::new(1), + }); + let mut runtime = Runtime { + inner: runtime_inner, + threads: Vec::with_capacity(num_threads as usize), }; + + for thread_index in 0..num_threads { + let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index); + let thread_handle = std::thread::spawn(move || { + scheduler.run(); + }); + + runtime.threads.push(thread_handle); + } + + return runtime; + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + self.inner.decrement_active_components(); + for handle in self.threads.drain(..) { + handle.join().expect("join scheduler thread"); + } } +} + +/// Memory that is maintained by "the runtime". In practice it is maintained by +/// multiple schedulers, and this serves as the common interface to that memory. +pub(crate) struct RuntimeInner { + pub protocol: ProtocolDescription, + components: ComponentStore, + work_queue: Mutex>, + work_condvar: Condvar, + active_elements: AtomicU32, // active components and APIs (i.e. component creators) +} +impl RuntimeInner { // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { @@ -154,6 +184,7 @@ impl Runtime { lock = self.work_condvar.wait(lock).unwrap(); } + // We have work, or the schedulers should exit. return lock.pop_front(); } @@ -185,6 +216,7 @@ impl Runtime { let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something + self.increment_active_components(); let component = self.components.get_mut(index); component.ctx.id = CompId(index); @@ -203,8 +235,27 @@ impl Runtime { pub(crate) fn destroy_component(&self, key: CompKey) { debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0); + self.decrement_active_components(); self.components.destroy(key.0); } - // Interacting with components + // Tracking number of active interfaces and the active components + + #[inline] + fn increment_active_components(&self) { + let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel); + debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0. + } + + fn decrement_active_components(&self) { + let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel); + debug_assert!(old_val > 0); // something wrong with incr/decr logic + let new_val = old_val - 1; + if new_val == 0 { + // Just to be sure, in case the last thing that gets destroyed is an + // API instead of a thread. + let lock = self.work_queue.lock(); + self.work_condvar.notify_all(); + } + } }