diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index d3e432ed679759a8d87bbe6cbcff6433e4a8660c..fb8df772666e3c76b344d84977f4ceeeef71cf2e 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,8 +1,11 @@ use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::thread; use std::collections::VecDeque; use crate::protocol::*; +use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer}; +use crate::runtime2::RtError; use super::communication::Message; use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; @@ -25,7 +28,7 @@ impl CompKey { } } -/// Generational ID of a component +/// Generational ID of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); @@ -80,7 +83,7 @@ pub(crate) struct CompPublic { /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, - id: CompId, // TODO: @Remove after debugging + id: CompId, #[cfg(debug_assertions)] decremented: bool, } @@ -95,14 +98,17 @@ impl CompHandle { return handle; } - pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { - sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); + pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) { self.inbox.push(message); if try_wake_up { - wake_up_if_sleeping(sched_ctx, self.id, self); + wake_up_if_sleeping(runtime, self.id, self); } } + pub(crate) fn id(&self) -> CompId { + return self.id; + } + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed @@ -155,13 +161,17 @@ impl Drop for CompHandle { pub struct Runtime { pub(crate) inner: Arc, - threads: Vec>, + scheduler_threads: Vec>, + polling_destroyer: PollingThreadDestroyer, + polling_thread: Option>, } impl Runtime { // TODO: debug_logging should be removed at some point - pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime { - assert!(num_threads > 0, "need a thread to perform work"); + pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { + if num_threads == 0 { + return Err(rt_error!("need at least one thread to create the runtime")); + } let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), @@ -169,21 +179,36 @@ impl Runtime { work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); - let mut runtime = Runtime { - inner: runtime_inner, - threads: Vec::with_capacity(num_threads as usize), - }; + let polling_builder = rt_error_try!( + PollingThreadBuilder::new(runtime_inner.clone(), debug_logging), + "failed to build polling thread" + ); + + let mut scheduler_threads = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { - let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging); - let thread_handle = std::thread::spawn(move || { + let mut scheduler = Scheduler::new( + runtime_inner.clone(), polling_builder.client(), + thread_index, debug_logging + ); + let thread_handle = thread::spawn(move || { scheduler.run(); }); - runtime.threads.push(thread_handle); + scheduler_threads.push(thread_handle); } - return runtime; + let (mut poller, polling_destroyer) = polling_builder.into_thread(); + let polling_thread = thread::spawn(move || { + poller.run(); + }); + + return Ok(Runtime{ + inner: runtime_inner, + scheduler_threads, + polling_destroyer, + polling_thread: Some(polling_thread), + }); } pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> { @@ -205,9 +230,12 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); - for handle in self.threads.drain(..) { + for handle in self.scheduler_threads.drain(..) { handle.join().expect("join scheduler thread"); } + + self.polling_destroyer.initiate_destruction(); + self.polling_thread.take().unwrap().join().expect("join polling thread"); } }