diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 5c6ec8c980e4c03212e62d4123cdac96b437f0a2..9da208a48a371a354cd6ec2d7aece2ff4a5fdf21 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,11 +1,14 @@ use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::thread; use std::collections::VecDeque; use crate::protocol::*; +use crate::runtime2::poll::{PollingThread, PollingThreadHandle}; +use crate::runtime2::RtError; use super::communication::Message; -use super::component::{wake_up_if_sleeping, CompPDL, CompCtx}; +use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; @@ -25,7 +28,7 @@ impl CompKey { } } -/// Generational ID of a component +/// Generational ID of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); @@ -53,11 +56,12 @@ impl CompReserved { } } -/// Private fields of a component, may only be modified by a single thread at -/// a time. +/// Representation of a runtime component. Contains the bookkeeping variables +/// for the schedulers, the publicly accessible fields, and the private fields +/// that should only be accessed by the thread running the component's routine. pub(crate) struct RuntimeComp { pub public: CompPublic, - pub code: CompPDL, + pub component: Box, pub ctx: CompCtx, pub inbox: QueueDynMpsc, pub exiting: bool, @@ -79,7 +83,7 @@ pub(crate) struct CompPublic { /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, - id: CompId, // TODO: @Remove after debugging + id: CompId, #[cfg(debug_assertions)] decremented: bool, } @@ -94,14 +98,17 @@ impl CompHandle { return handle; } - pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { - sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); + pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) { self.inbox.push(message); if try_wake_up { - wake_up_if_sleeping(sched_ctx, self.id, self); + wake_up_if_sleeping(runtime, self.id, self); } } + pub(crate) fn id(&self) -> CompId { + return self.id; + } + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed @@ -154,13 +161,16 @@ impl Drop for CompHandle { pub struct Runtime { pub(crate) inner: Arc, - threads: Vec>, + scheduler_threads: Vec>, + polling_handle: PollingThreadHandle, } impl Runtime { // TODO: debug_logging should be removed at some point - pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime { - assert!(num_threads > 0, "need a thread to perform work"); + pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { + if num_threads == 0 { + return Err(rt_error!("need at least one thread to create the runtime")); + } let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), @@ -168,21 +178,30 @@ impl Runtime { work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); - let mut runtime = Runtime { - inner: runtime_inner, - threads: Vec::with_capacity(num_threads as usize), - }; + let (polling_handle, polling_clients) = rt_error_try!( + PollingThread::new(runtime_inner.clone(), debug_logging), + "failed to build polling thread" + ); + + let mut scheduler_threads = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { - let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging); - let thread_handle = std::thread::spawn(move || { + let mut scheduler = Scheduler::new( + runtime_inner.clone(), polling_clients.client(), + thread_index, debug_logging + ); + let thread_handle = thread::spawn(move || { scheduler.run(); }); - runtime.threads.push(thread_handle); + scheduler_threads.push(thread_handle); } - return runtime; + return Ok(Runtime{ + inner: runtime_inner, + scheduler_threads, + polling_handle, + }); } pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> { @@ -193,7 +212,8 @@ impl Runtime { )?; let reserved = self.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); - let (key, _) = self.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + let component = Box::new(CompPDL::new(prompt, 0)); + let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) @@ -203,9 +223,11 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); - for handle in self.threads.drain(..) { + for handle in self.scheduler_threads.drain(..) { handle.join().expect("join scheduler thread"); } + + self.polling_handle.shutdown().expect("shutdown polling thread"); } } @@ -214,7 +236,7 @@ impl Drop for Runtime { pub(crate) struct RuntimeInner { pub protocol: ProtocolDescription, components: ComponentStore, - work_queue: Mutex>, + work_queue: Mutex>, // TODO: should be MPMC queue work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } @@ -248,7 +270,7 @@ impl RuntimeInner { pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, - component: CompPDL, mut context: CompCtx, initially_sleeping: bool, + component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); @@ -261,7 +283,7 @@ impl RuntimeInner { num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, - code: component, + component, ctx: context, inbox: inbox_queue, exiting: false, @@ -284,12 +306,15 @@ impl RuntimeInner { return CompHandle::new(id, &component.public); } + /// Will remove a component and its memory from the runtime. May only be + /// called if the necessary conditions for destruction have been met. pub(crate) fn destroy_component(&self, key: CompKey) { dbg_code!({ let component = self.get_component(key); debug_assert!(component.exiting); debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); }); + self.decrement_active_components(); self.components.destroy(key.0); }