use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::thread; use std::collections::VecDeque; use crate::protocol::*; use crate::runtime2::poll::{PollingThread, PollingThreadHandle}; use crate::runtime2::RtError; use super::communication::Message; use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) struct CompKey(pub u32); impl CompKey { pub(crate) fn downgrade(&self) -> CompId { return CompId(self.0); } } /// Generational ID of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); impl CompId { pub(crate) fn new_invalid() -> CompId { return CompId(u32::MAX); } /// Upgrade component ID to component key. Unsafe because the caller needs /// to make sure that only one component key can exist at a time (to ensure /// a component can only be scheduled/executed by one thread). pub(crate) unsafe fn upgrade(&self) -> CompKey { return CompKey(self.0); } } /// Handle to a component that is being created. pub(crate) struct CompReserved { reservation: ComponentReservation, } impl CompReserved { pub(crate) fn id(&self) -> CompId { return CompId(self.reservation.index) } } /// Representation of a runtime component. Contains the bookkeeping variables /// for the schedulers, the publicly accessible fields, and the private fields /// that should only be accessed by the thread running the component's routine. pub(crate) struct RuntimeComp { pub public: CompPublic, pub component: Box, pub ctx: CompCtx, pub inbox: QueueDynMpsc, pub exiting: bool, } /// Should contain everything that is accessible in a thread-safe manner // TODO: Do something about the `num_handles` thing. This needs to be a bit more // "foolproof" to lighten the mental burden of using the `num_handles` // variable. pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // manually modified (!) inbox: QueueDynProducer, } /// Handle to public part of a component. Would be nice if we could /// automagically manage the `num_handles` counter. But when it reaches zero we /// need to manually remove the handle from the runtime. So we just have debug /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, id: CompId, #[cfg(debug_assertions)] decremented: bool, } impl CompHandle { fn new(id: CompId, public: &CompPublic) -> CompHandle { let handle = CompHandle{ target: public, id, #[cfg(debug_assertions)] decremented: false, }; handle.increment_users(); return handle; } pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) { self.inbox.push(message); if try_wake_up { wake_up_if_sleeping(runtime, self.id, self); } } pub(crate) fn id(&self) -> CompId { return self.id; } fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed } /// Returns the `CompKey` to the component if it should be destroyed pub(crate) fn decrement_users(&mut self) -> Option { dbg_code!(assert!(!self.decremented, "illegal to 'decrement_users' twice")); let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; dbg_code!(self.decremented = true); if new_count == 0 { return Some(unsafe{ self.id.upgrade() }); } return None; } } impl Clone for CompHandle { fn clone(&self) -> Self { dbg_code!(assert!(!self.decremented, "illegal to clone after 'decrement_users'")); self.increment_users(); return CompHandle{ target: self.target, id: self.id, #[cfg(debug_assertions)] decremented: false, }; } } impl std::ops::Deref for CompHandle { type Target = CompPublic; fn deref(&self) -> &Self::Target { dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished return unsafe{ &*self.target }; } } impl Drop for CompHandle { fn drop(&mut self) { dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping")); } } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- pub struct Runtime { pub(crate) inner: Arc, scheduler_threads: Vec>, polling_handle: PollingThreadHandle, } impl Runtime { // TODO: debug_logging should be removed at some point pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { if num_threads == 0 { return Err(rt_error!("need at least one thread to create the runtime")); } let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); let (polling_handle, polling_clients) = rt_error_try!( PollingThread::new(runtime_inner.clone(), debug_logging), "failed to build polling thread" ); let mut scheduler_threads = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { let mut scheduler = Scheduler::new( runtime_inner.clone(), polling_clients.client(), thread_index, debug_logging ); let thread_handle = thread::spawn(move || { scheduler.run(); }); scheduler_threads.push(thread_handle); } return Ok(Runtime{ inner: runtime_inner, scheduler_threads, polling_handle, }); } pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> { use crate::protocol::eval::ValueGroup; let prompt = self.inner.protocol.new_component( module_name, routine_name, ValueGroup::new_stack(Vec::new()) )?; let reserved = self.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); let component = Box::new(CompPDL::new(prompt, 0)); let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) } } impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); for handle in self.scheduler_threads.drain(..) { handle.join().expect("join scheduler thread"); } self.polling_handle.shutdown().expect("shutdown polling thread"); } } /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub(crate) struct RuntimeInner { pub protocol: ProtocolDescription, components: ComponentStore, work_queue: Mutex>, // TODO: should be MPMC queue work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } impl RuntimeInner { // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { let mut lock = self.work_queue.lock().unwrap(); while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { lock = self.work_condvar.wait(lock).unwrap(); } // We have work, or the schedulers should exit. return lock.pop_front(); } pub(crate) fn enqueue_work(&self, key: CompKey) { let mut lock = self.work_queue.lock().unwrap(); lock.push_back(key); self.work_condvar.notify_one(); } // Creating/destroying components pub(crate) fn start_create_pdl_component(&self) -> CompReserved { self.increment_active_components(); let reservation = self.components.reserve(); return CompReserved{ reservation }; } pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); let _id = reserved.id(); context.id = reserved.id(); let component = RuntimeComp { public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, component, ctx: context, inbox: inbox_queue, exiting: false, }; let index = self.components.submit(reserved.reservation, component); debug_assert_eq!(index, _id.0); let component = self.components.get_mut(index); return (CompKey(index), component); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { let component = self.components.get_mut(key.0); return component; } pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); return CompHandle::new(id, &component.public); } /// Will remove a component and its memory from the runtime. May only be /// called if the necessary conditions for destruction have been met. pub(crate) fn destroy_component(&self, key: CompKey) { dbg_code!({ let component = self.get_component(key); debug_assert!(component.exiting); debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); }); self.decrement_active_components(); self.components.destroy(key.0); } // Tracking number of active interfaces and the active components #[inline] fn increment_active_components(&self) { let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel); debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0. } fn decrement_active_components(&self) { let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel); debug_assert!(old_val > 0); // something wrong with incr/decr logic let new_val = old_val - 1; if new_val == 0 { // Just to be sure, in case the last thing that gets destroyed is an // API instead of a thread. let _lock = self.work_queue.lock(); self.work_condvar.notify_all(); } } }