use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::collections::VecDeque; use crate::protocol::*; use super::communication::Message; use super::component::{CompCtx, CompPDL}; use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) struct CompKey(u32); impl CompKey { pub(crate) fn downgrade(&self) -> CompId { return CompId(self.0); } } /// Generational ID of a component #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(u32); impl CompId { pub(crate) fn new_invalid() -> CompId { return CompId(u32::MAX); } /// Upgrade component ID to component key. Unsafe because the caller needs /// to make sure that only one component key can exist at a time (to ensure /// a component can only be scheduled/executed by one thread). pub(crate) unsafe fn upgrade(&self) -> CompKey { return CompKey(self.0); } } /// Private fields of a component, may only be modified by a single thread at /// a time. pub(crate) struct RuntimeComp { pub public: CompPublic, pub code: CompPDL, pub ctx: CompCtx, pub inbox: QueueDynMpsc } /// Should contain everything that is accessible in a thread-safe manner pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // manually modified (!) pub inbox: QueueDynProducer, } /// Handle to public part of a component. Would be nice if we could /// automagically manage the `num_handles` counter. But when it reaches zero we /// need to manually remove the handle from the runtime. So be careful. pub(crate) struct CompHandle { target: *const CompPublic, } impl CompHandle { pub(crate) fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed } /// Returns true if the component should be destroyed pub(crate) fn decrement_users(&self) -> bool { let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); return old_count == 1; } } impl std::ops::Deref for CompHandle { type Target = CompPublic; fn deref(&self) -> &Self::Target { return unsafe{ &*self.target }; } } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- pub type RuntimeHandle = Arc; /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { pub protocol: ProtocolDescription, components: ComponentStore, work_queue: Mutex>, work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); return Runtime{ protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), active_elements: AtomicU32::new(0), }; } // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { let mut lock = self.work_queue.lock().unwrap(); while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { lock = self.work_condvar.wait(lock).unwrap(); } return lock.pop_front(); } pub(crate) fn enqueue_work(&self, key: CompKey) { let mut lock = self.work_queue.lock().unwrap(); lock.push_back(key); self.work_condvar.notify_one(); } // Creating/destroying components /// Creates a new component. Note that the public part will be properly /// initialized, but the private fields (e.g. owned ports, peers, etc.) /// are not. pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); let comp = RuntimeComp{ public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, code: comp, ctx: CompCtx::default(), inbox: inbox_queue, }; let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something let component = self.components.get_mut(index); component.ctx.id = CompId(index); return (CompKey(index), component); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { let component = self.components.get_mut(key.0); return component; } pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); return CompHandle{ target: &component.public }; } pub(crate) fn destroy_component(&self, key: CompKey) { self.components.destroy(key.0); } // Interacting with components }