diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 0d8f13909ebaee5567dfdb99a6022e5d3bc58ba0..621b50e17daa9b070687640d9e6baab332fe03f5 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -4,8 +4,9 @@ use std::collections::VecDeque; use crate::protocol::*; +use super::communication::Message; use super::component::{CompCtx, CompPDL}; -use super::store::ComponentStore; +use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; // ----------------------------------------------------------------------------- // Component @@ -40,23 +41,42 @@ impl CompId { } } -/// In-runtime storage of a component +/// Private fields of a component, may only be modified by a single thread at +/// a time. pub(crate) struct RuntimeComp { pub public: CompPublic, - pub private: CompPrivate, + pub code: CompPDL, + pub ctx: CompCtx, + pub inbox: QueueDynMpsc } /// Should contain everything that is accessible in a thread-safe manner pub(crate) struct CompPublic { pub sleeping: AtomicBool, - pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances + pub num_handles: AtomicU32, // manually modified (!) + pub inbox: QueueDynProducer, } -/// Handle to public part of a component. +/// Handle to public part of a component. Would be nice if we could +/// automagically manage the `num_handles` counter. But when it reaches zero we +/// need to manually remove the handle from the runtime. So be careful. pub(crate) struct CompHandle { target: *const CompPublic, } +impl CompHandle { + pub(crate) fn increment_users(&self) { + let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); + debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed + } + + /// Returns true if the component should be destroyed + pub(crate) fn decrement_users(&self) -> bool { + let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); + return old_count == 1; + } +} + impl std::ops::Deref for CompHandle { type Target = CompPublic; @@ -65,13 +85,6 @@ impl std::ops::Deref for CompHandle { } } -/// May contain non thread-safe fields. Accessed only by the scheduler which -/// will temporarily "own" the component. -pub(crate) struct CompPrivate { - pub code: CompPDL, - pub ctx: CompCtx, -} - // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- @@ -119,29 +132,30 @@ impl Runtime { // Creating/destroying components - pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey { + /// Creates a new component. Note that the public part will be properly + /// initialized, but the private fields (e.g. owned ports, peers, etc.) + /// are not. + pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { + let inbox_queue = QueueDynMpsc::new(16); + let inbox_producer = inbox_queue.producer(); let comp = RuntimeComp{ public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle + inbox: inbox_producer, }, - private: CompPrivate{ - code: comp, - ctx: CompCtx{ - id: CompId(0), - ports: Vec::new(), - peers: Vec::new(), - messages: Vec::new(), - } - } + code: comp, + ctx: CompCtx::default(), + inbox: inbox_queue, }; let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something - self.components.get_mut(index).private.ctx.id = CompId(index); + let component = self.components.get_mut(index); + component.ctx.id = CompId(index); - return CompKey(index); + return (CompKey(index), component); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { @@ -149,9 +163,9 @@ impl Runtime { return component; } - pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic { + pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); - return &component.public; + return CompHandle{ target: &component.public }; } pub(crate) fn destroy_component(&self, key: CompKey) {