diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index b3f6ed830b77fe7d8153a97872eec013dd851309..de0d8cc5461dc9dee9fa87b537774f81a014d8e8 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -433,7 +433,7 @@ use std::cell::UnsafeCell; /// Lookup table for monomorphs. Wrapped in a special struct because we don't /// want to allocate for each lookup (what we really want is a HashMap that -/// exposes it CompareFn and HashFn, but whatevs). +/// exposes its CompareFn and HashFn, but whatevs). pub(crate) struct MonomorphTable { lookup: HashMap, // indexes into `monomorphs` pub(crate) monomorphs: Vec, @@ -450,6 +450,11 @@ pub(crate) struct MonomorphTable { key: UnsafeCell, } +// TODO: Clean this up: somehow prevent the `key`, but also do not allocate for +// each "get_monomorph_index" +unsafe impl Send for MonomorphTable{} +unsafe impl Sync for MonomorphTable{} + impl MonomorphTable { fn new() -> Self { return Self { diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 880f89fa5743b93ed09697b852bbc932d88652ac..691c7ca21685901f3a3f2a0c58d6ec7d3ef0bd50 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -2,4 +2,5 @@ mod store; mod runtime; mod component; mod communication; -mod scheduler; \ No newline at end of file +mod scheduler; +#[cfg(test)] mod tests; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 846a60389f8b022adcb7810efa45f05effcc3a83..bf6a9baed2d0680b331672d0fcfd60f5328d2cbf 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -7,6 +7,7 @@ use crate::protocol::*; use super::communication::Message; use super::component::{CompCtx, CompPDL}; use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; +use super::scheduler::Scheduler; // ----------------------------------------------------------------------------- // Component @@ -122,30 +123,59 @@ impl Drop for CompHandle { // Runtime // ----------------------------------------------------------------------------- -pub type RuntimeHandle = Arc; - -/// Memory that is maintained by "the runtime". In practice it is maintained by -/// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { - pub protocol: ProtocolDescription, - components: ComponentStore, - work_queue: Mutex>, - work_condvar: Condvar, - active_elements: AtomicU32, // active components and APIs (i.e. component creators) + pub(crate) inner: Arc, + threads: Vec>, } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); - return Runtime{ + let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), - active_elements: AtomicU32::new(0), + active_elements: AtomicU32::new(1), + }); + let mut runtime = Runtime { + inner: runtime_inner, + threads: Vec::with_capacity(num_threads as usize), }; + + for thread_index in 0..num_threads { + let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index); + let thread_handle = std::thread::spawn(move || { + scheduler.run(); + }); + + runtime.threads.push(thread_handle); + } + + return runtime; + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + self.inner.decrement_active_components(); + for handle in self.threads.drain(..) { + handle.join().expect("join scheduler thread"); + } } +} + +/// Memory that is maintained by "the runtime". In practice it is maintained by +/// multiple schedulers, and this serves as the common interface to that memory. +pub(crate) struct RuntimeInner { + pub protocol: ProtocolDescription, + components: ComponentStore, + work_queue: Mutex>, + work_condvar: Condvar, + active_elements: AtomicU32, // active components and APIs (i.e. component creators) +} +impl RuntimeInner { // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { @@ -154,6 +184,7 @@ impl Runtime { lock = self.work_condvar.wait(lock).unwrap(); } + // We have work, or the schedulers should exit. return lock.pop_front(); } @@ -185,6 +216,7 @@ impl Runtime { let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something + self.increment_active_components(); let component = self.components.get_mut(index); component.ctx.id = CompId(index); @@ -203,8 +235,27 @@ impl Runtime { pub(crate) fn destroy_component(&self, key: CompKey) { debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0); + self.decrement_active_components(); self.components.destroy(key.0); } - // Interacting with components + // Tracking number of active interfaces and the active components + + #[inline] + fn increment_active_components(&self) { + let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel); + debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0. + } + + fn decrement_active_components(&self) { + let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel); + debug_assert!(old_val > 0); // something wrong with incr/decr logic + let new_val = old_val - 1; + if new_val == 0 { + // Just to be sure, in case the last thing that gets destroyed is an + // API instead of a thread. + let lock = self.work_queue.lock(); + self.work_condvar.notify_all(); + } + } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 2ed3f6eac8d91490b86f8752a7e9a99964a06d6e..9286998dd0f41e3ff8e36d78657323061b4a1bd4 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::sync::atomic::Ordering; use super::component::*; @@ -6,16 +7,16 @@ use super::communication::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { - runtime: RuntimeHandle, + runtime: Arc, scheduler_id: u32, } pub(crate) struct SchedulerCtx<'a> { - pub runtime: &'a Runtime, + pub runtime: &'a RuntimeInner, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a Runtime) -> Self { + pub fn new(runtime: &'a RuntimeInner) -> Self { return Self { runtime, } @@ -25,7 +26,7 @@ impl<'a> SchedulerCtx<'a> { impl Scheduler { // public interface to thread - pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self { + pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Scheduler{ runtime, scheduler_id } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..e1e36e0ebfa199dc82afa8adafc2ba6afe54037c --- /dev/null +++ b/src/runtime2/tests/mod.rs @@ -0,0 +1,22 @@ +use crate::protocol::*; +use crate::protocol::eval::*; +use crate::runtime2::runtime::*; +use crate::runtime2::component::CompPDL; + +#[test] +fn test_component_creation() { + let pd = ProtocolDescription::parse(b" + primitive nothing_at_all() { + s32 a = 5; + print(\"hello\"); + auto b = 5 + a; + } + ").expect("compilation"); + let rt = Runtime::new(1, pd); + + let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new())) + .expect("component creation"); + let comp = CompPDL::new(prompt, 0); + let (key, _) = rt.inner.create_pdl_component(comp, true); + rt.inner.enqueue_work(key); +} \ No newline at end of file