use std::ptr; use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard}; use std::sync::atomic::{AtomicBool, AtomicU32}; use crate::collections::{MpmcQueue, RawVec}; use super::connector::{ConnectorPDL, ConnectorPublic}; use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use super::inbox::PublicInbox; use super::scheduler::Router; use crate::ProtocolDescription; use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage}; use crate::runtime2::native::Connector; use crate::runtime2::scheduler::ConnectorCtx; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. pub(crate) struct ConnectorKey { pub index: u32, // of connector } impl ConnectorKey { /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable /// access, to a "regular ID" which can be used to obtain immutable access. #[inline] pub fn downcast(&self) -> ConnectorId { return ConnectorId(self.index); } /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system #[inline] pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { return ConnectorKey{ index: id.0 }; } } /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Copy, Clone)] pub(crate) struct ConnectorId(pub u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove #[inline] pub fn new_invalid() -> ConnectorId { return ConnectorId(u32::MAX); } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.0 != u32::MAX; } } // TODO: Change this, I hate this. But I also don't want to put `public` and // `router` of `ScheduledConnector` back into `Connector`. The reason I don't // want `Box` everywhere is because of the v-table overhead. But // to truly design this properly I need some benchmarks. pub enum ConnectorVariant { UserDefined(ConnectorPDL), Native(Box), } impl Connector for ConnectorVariant { fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { match self { ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), } } fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), } } } pub struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: Router, } /// The registry containing all connectors. The idea here is that when someone /// owns a `ConnectorKey`, then one has unique access to that connector. /// Otherwise one has shared access. /// /// This datastructure is built to be wrapped in a RwLock. pub(crate) struct ConnectorStore { pub(crate) port_counter: Arc, inner: RwLock, } struct ConnectorStoreInner { connectors: RawVec<*mut ScheduledConnector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { return Self{ port_counter: Arc::new(AtomicU32::new(0)), inner: RwLock::new(ConnectorStoreInner { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), }), }; } /// Retrieves the shared members of the connector. pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); return &*connector.public; } } /// Retrieves a particular connector. Only the thread that pulled the /// associated key out of the execution queue should (be able to) call this. pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return *connector as &mut _; } } /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey { // Creation of the connector in the global store, requires a lock { let lock = self.inner.write().unwrap(); let connector = ScheduledConnector { connector, context: ConnectorCtx::new(self.port_counter.clone()), public: ConnectorPublic::new(), router: Router::new(), }; let index; if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); unsafe { // Cheating a bit here. Anyway, move to heap, store in list index = lock.connectors.len(); lock.connectors.push(connector); } } else { index = lock.free.pop().unwrap(); unsafe { let target = lock.connectors.get_mut(index); debug_assert!(!target.is_null()); ptr::write(*target, connector); } } } // Setting of new connector's ID let key = ConnectorKey{ index: index as u32 }; let new_connector = self.get_mut(&key); new_connector.context.id = key.downcast(); // Transferring ownership of ports (and crashing if there is a // programmer's mistake in port management) match &new_connector.connector { ConnectorVariant::UserDefined(connector) => { for port_id in &connector.ports.owned_ports { let mut port = created_by.context.remove_port(*port_id); port.owning_connector = new_connector.context.id; new_connector.context.add_port(port); } }, ConnectorVariant::Native(_) => {}, // no initial ports (yet!) } return key; } pub(crate) fn destroy(&self, key: ConnectorKey) { let lock = self.inner.write().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); ptr::drop_in_place(*connector); // Note: but not deallocating! } lock.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { let lock = self.inner.write().unwrap(); for idx in 0..lock.connectors.len() { unsafe { let memory = *lock.connectors.get_mut(idx); let _ = Box::from_raw(memory); // takes care of deallocation } } } } /// Global store of connectors, ports and queues that are used by the sceduler /// threads. The global store has the appearance of a thread-safe datatype, but /// one needs to be careful using it. /// /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. /// This includes the `should_exit` and `did_exit` pair! pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { pub fn new() -> Self { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), should_exit: AtomicBool::new(false), } } }