use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::collections::VecDeque; use crate::protocol::*; use super::component::{CompCtx, CompPDL}; use super::store::ComponentStore; // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) struct CompKey(u32); impl CompKey { pub(crate) fn downgrade(&self) -> CompId { return CompId(self.0); } } /// Generational ID of a component #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(u32); impl CompId { pub(crate) fn new_invalid() -> CompId { return CompId(u32::MAX); } /// Upgrade component ID to component key. Unsafe because the caller needs /// to make sure that only one component key can exist at a time (to ensure /// a component can only be scheduled/executed by one thread). pub(crate) unsafe fn upgrade(&self) -> CompKey { return CompKey(self.0); } } /// In-runtime storage of a component pub(crate) struct RuntimeComp { pub public: CompPublic, pub private: CompPrivate, } /// Should contain everything that is accessible in a thread-safe manner pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances } /// Handle to public part of a component. pub(crate) struct CompHandle { target: *const CompPublic, } impl std::ops::Deref for CompHandle { type Target = CompPublic; fn deref(&self) -> &Self::Target { return unsafe{ &*self.target }; } } /// May contain non thread-safe fields. Accessed only by the scheduler which /// will temporarily "own" the component. pub(crate) struct CompPrivate { pub code: CompPDL, pub ctx: CompCtx, } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- pub type RuntimeHandle = Arc; /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { pub protocol: ProtocolDescription, components: ComponentStore, work_queue: Mutex>, work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); return Runtime{ protocol: protocol_description, components: ComponentStore::new(128), work_queue: Mutex::new(VecDeque::with_capacity(128)), work_condvar: Condvar::new(), active_elements: AtomicU32::new(0), }; } // Scheduling and retrieving work pub(crate) fn take_work(&self) -> Option { let mut lock = self.work_queue.lock().unwrap(); while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { lock = self.work_condvar.wait(lock).unwrap(); } return lock.pop_front(); } pub(crate) fn enqueue_work(&self, key: CompKey) { let mut lock = self.work_queue.lock().unwrap(); lock.push_back(key); self.work_condvar.notify_one(); } // Creating/destroying components pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey { let comp = RuntimeComp{ public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle }, private: CompPrivate{ code: comp, ctx: CompCtx{ id: CompId(0), ports: Vec::new(), peers: Vec::new(), messages: Vec::new(), } } }; let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something self.components.get_mut(index).private.ctx.id = CompId(index); return CompKey(index); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { let component = self.components.get_mut(key.0); return component; } pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic { let component = self.components.get(id.0); return &component.public; } pub(crate) fn destroy_component(&self, key: CompKey) { self.components.destroy(key.0); } // Interacting with components }