diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 9b75ae2a4536113990a55afca44b831331b5e84f..96a30954aeedf71e9f3684d89b0cef2550aff4b1 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -2,13 +2,53 @@ use crate::collections::{MpmcQueue, RawVec}; use super::connector::{Connector, ConnectorPublic}; use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; +use super::inbox::PublicInbox; +use super::scheduler::Router; use std::ptr; -use std::sync::{RwLock, RwLockReadGuard}; +use std::sync::{Barrier, RwLock, RwLockReadGuard}; +use std::sync::atomic::AtomicBool; + +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. +pub(crate) struct ConnectorKey { + pub index: u32, // of connector +} + +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId(self.index); + } + + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ index: id.0 }; + } +} -/// A kind of token that, once obtained, allows access to a container. -struct ConnectorKey { - index: u32, // of connector +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Copy, Clone)] +pub(crate) struct ConnectorId(u32); + +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId(u32::MAX); + } +} + +pub struct ScheduledConnector { + pub connector: Connector, + pub public: ConnectorPublic, + pub router: Router } /// The registry containing all connectors. The idea here is that when someone @@ -21,7 +61,7 @@ struct ConnectorStore { } struct ConnectorStoreInner { - connectors: RawVec<*mut Connector>, + connectors: RawVec<*mut ScheduledConnector>, free: Vec, } @@ -36,11 +76,11 @@ impl ConnectorStore { } /// Retrieves the shared members of the connector. - pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic { + pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.inner.read().unwrap(); unsafe { - let connector = lock.connectors.get(connector_id as usize); + let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); return &*connector.public; } @@ -48,7 +88,7 @@ impl ConnectorStore { /// Retrieves a particular connector. Only the thread that pulled the /// associated key out of the execution queue should (be able to) call this. - pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.inner.read().unwrap(); unsafe { @@ -62,6 +102,11 @@ impl ConnectorStore { /// and/or queue it. pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { let lock = self.inner.write().unwrap(); + let connector = ScheduledConnector{ + connector, + public: ConnectorPublic::new(), + router: Router::new(), + }; let index; if lock.free.is_empty() { @@ -144,14 +189,17 @@ impl PortStore { } } - pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { + pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { let mut lock = self.inner.write().unwrap(); // Reserves a new port. Doesn't point it to its counterpart - fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { + fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { let index; - let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned }; - let connector_id = creating_connector.unwrap_or(0); + let (ownership, connector_id) = if creating_connector.is_some() { + (PortOwnership::Owned, creating_connector.unwrap()) + } else { + (PortOwnership::Unowned, ConnectorId::new_invalid()) + }; if lock.free.is_empty() { index = lock.ports.len() as u32; @@ -237,10 +285,12 @@ impl Drop for PortStore { /// /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. +/// This includes the `should_exit` and `did_exit` pair! pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub ports: PortStore, + pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { @@ -249,6 +299,7 @@ impl GlobalStore { connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), ports: PortStore::with_capacity(256), + should_exit: AtomicBool::new(false), } } } \ No newline at end of file