use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::collections::VecDeque; use crate::protocol::*; use super::communication::Message; use super::component::{wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) struct CompKey(pub u32); impl CompKey { pub(crate) fn downgrade(&self) -> CompId { return CompId(self.0); } } /// Generational ID of a component #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); impl CompId { pub(crate) fn new_invalid() -> CompId { return CompId(u32::MAX); } /// Upgrade component ID to component key. Unsafe because the caller needs /// to make sure that only one component key can exist at a time (to ensure /// a component can only be scheduled/executed by one thread). pub(crate) unsafe fn upgrade(&self) -> CompKey { return CompKey(self.0); } } /// Handle to a component that is being created. pub(crate) struct CompReserved { reservation: ComponentReservation, } impl CompReserved { pub(crate) fn id(&self) -> CompId { return CompId(self.reservation.index) } } /// Private fields of a component, may only be modified by a single thread at /// a time. pub(crate) struct RuntimeComp { pub public: CompPublic, pub code: CompPDL, pub ctx: CompCtx, pub inbox: QueueDynMpsc, pub exiting: bool, } /// Should contain everything that is accessible in a thread-safe manner // TODO: Do something about the `num_handles` thing. This needs to be a bit more // "foolproof" to lighten the mental burden of using the `num_handles` // variable. pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // manually modified (!) inbox: QueueDynProducer, } /// Handle to public part of a component. Would be nice if we could /// automagically manage the `num_handles` counter. But when it reaches zero we /// need to manually remove the handle from the runtime. So we just have debug /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, id: CompId, // TODO: @Remove after debugging #[cfg(debug_assertions)] decremented: bool, } impl CompHandle { fn new(id: CompId, public: &CompPublic) -> CompHandle { let handle = CompHandle{ target: public, id, #[cfg(debug_assertions)] decremented: false, }; handle.increment_users(); return handle; } pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); self.inbox.push(message); if try_wake_up { wake_up_if_sleeping(sched_ctx, self.id, self); } } fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed } /// Returns the `CompKey` to the component if it should be destroyed pub(crate) fn decrement_users(&mut self) -> Option { debug_assert!(!self.decremented, "illegal to 'decrement_users' twice"); let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; dbg_code!(self.decremented = true); println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id); if new_count == 0 { return Some(unsafe{ self.id.upgrade() }); } return None; } } impl Clone for CompHandle { fn clone(&self) -> Self { debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'"); self.increment_users(); return CompHandle{ target: self.target, id: self.id, #[cfg(debug_assertions)] decremented: false, }; } } impl std::ops::Deref for CompHandle { type Target = CompPublic; fn deref(&self) -> &Self::Target { debug_assert!(!self.decremented); // cannot access if control is relinquished return unsafe{ &*self.target }; } } impl Drop for CompHandle { fn drop(&mut self) { debug_assert!(self.decremented, "need call to 'decrement_users' before dropping"); } } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- pub struct Runtime { pub(crate) inner: Arc, threads: Vec>, } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); let mut runtime = Runtime { inner: runtime_inner, threads: Vec::with_capacity(num_threads as usize), }; for thread_index in 0..num_threads { let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index); let thread_handle = std::thread::spawn(move || { scheduler.run(); }); runtime.threads.push(thread_handle); } return runtime; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); for handle in self.threads.drain(..) { handle.join().expect("join scheduler thread"); } } } /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub(crate) struct RuntimeInner { pub protocol: ProtocolDescription, components: ComponentStore, work_queue: Mutex>, work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } impl RuntimeInner { // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { let mut lock = self.work_queue.lock().unwrap(); while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { lock = self.work_condvar.wait(lock).unwrap(); } // We have work, or the schedulers should exit. return lock.pop_front(); } pub(crate) fn enqueue_work(&self, key: CompKey) { let mut lock = self.work_queue.lock().unwrap(); lock.push_back(key); self.work_condvar.notify_one(); } // Creating/destroying components pub(crate) fn start_create_pdl_component(&self) -> CompReserved { self.increment_active_components(); let reservation = self.components.reserve(); return CompReserved{ reservation }; } pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, component: CompPDL, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); let _id = reserved.id(); context.id = reserved.id(); let component = RuntimeComp { public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, code: component, ctx: context, inbox: inbox_queue, exiting: false, }; let index = self.components.submit(reserved.reservation, component); debug_assert_eq!(index, _id.0); let component = self.components.get_mut(index); return (CompKey(index), component); } /// Creates a new component. Note that the public part will be properly /// initialized, but not all private fields are. pub(crate) fn create_pdl_component(&self, comp: CompPDL, ctx: CompCtx, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); let comp = RuntimeComp{ public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, code: comp, ctx, inbox: inbox_queue, exiting: false, }; let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something self.increment_active_components(); let component = self.components.get_mut(index); component.ctx.id = CompId(index); return (CompKey(index), component); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { let component = self.components.get_mut(key.0); return component; } pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); return CompHandle::new(id, &component.public); } pub(crate) fn destroy_component(&self, key: CompKey) { dbg_code!({ let component = self.get_component(key); debug_assert!(component.exiting); debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); }); self.decrement_active_components(); self.components.destroy(key.0); } // Tracking number of active interfaces and the active components #[inline] fn increment_active_components(&self) { let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel); debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0. } fn decrement_active_components(&self) { let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel); debug_assert!(old_val > 0); // something wrong with incr/decr logic let new_val = old_val - 1; if new_val == 0 { // Just to be sure, in case the last thing that gets destroyed is an // API instead of a thread. let _lock = self.work_queue.lock(); self.work_condvar.notify_all(); } } }