use crate::collections::{MpmcQueue, RawVec}; use super::connector::{Connector, ConnectorPublic}; use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use super::inbox::PublicInbox; use super::scheduler::Router; use std::ptr; use std::sync::{Barrier, RwLock, RwLockReadGuard}; use std::sync::atomic::AtomicBool; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. pub(crate) struct ConnectorKey { pub index: u32, // of connector } impl ConnectorKey { /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable /// access, to a "regular ID" which can be used to obtain immutable access. #[inline] pub fn downcast(&self) -> ConnectorId { return ConnectorId(self.index); } /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system #[inline] pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { return ConnectorKey{ index: id.0 }; } } /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Copy, Clone)] pub(crate) struct ConnectorId(u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove #[inline] pub fn new_invalid() -> ConnectorId { return ConnectorId(u32::MAX); } } pub struct ScheduledConnector { pub connector: Connector, pub public: ConnectorPublic, pub router: Router } /// The registry containing all connectors. The idea here is that when someone /// owns a `ConnectorKey`, then one has unique access to that connector. /// Otherwise one has shared access. /// /// This datastructure is built to be wrapped in a RwLock. struct ConnectorStore { inner: RwLock, } struct ConnectorStoreInner { connectors: RawVec<*mut ScheduledConnector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { return Self{ inner: RwLock::new(ConnectorStoreInner { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), }), }; } /// Retrieves the shared members of the connector. pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); return &*connector.public; } } /// Retrieves a particular connector. Only the thread that pulled the /// associated key out of the execution queue should (be able to) call this. pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return *connector as &mut _; } } /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { let lock = self.inner.write().unwrap(); let connector = ScheduledConnector{ connector, public: ConnectorPublic::new(), router: Router::new(), }; let index; if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); unsafe { // Cheating a bit here. Anyway, move to heap, store in list index = lock.connectors.len(); lock.connectors.push(connector); } } else { index = lock.free.pop().unwrap(); unsafe { let target = lock.connectors.get_mut(index); debug_assert!(!target.is_null()); ptr::write(*target, connector); } } return ConnectorKey{ index: index as u32 }; } pub(crate) fn destroy(&self, key: ConnectorKey) { let lock = self.inner.write().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); ptr::drop_in_place(*connector); // Note: but not deallocating! } lock.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { let lock = self.inner.write().unwrap(); for idx in 0..lock.connectors.len() { unsafe { let memory = *lock.connectors.get_mut(idx); let _ = Box::from_raw(memory); // takes care of deallocation } } } } /// The registry of all ports pub struct PortStore { inner: RwLock, } struct PortStoreInner { ports: RawVec, free: Vec, } impl PortStore { fn with_capacity(capacity: usize) -> Self { Self{ inner: RwLock::new(PortStoreInner{ ports: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), }), } } pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef { let lock = self.inner.read().unwrap(); debug_assert!(port_id.is_valid()); unsafe { let port = lock.ports.get_mut(port_id.index as usize); let port = &mut *port; debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing return PortRef{ lock, port }; } } pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { let mut lock = self.inner.write().unwrap(); // Reserves a new port. Doesn't point it to its counterpart fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { let index; let (ownership, connector_id) = if creating_connector.is_some() { (PortOwnership::Owned, creating_connector.unwrap()) } else { (PortOwnership::Unowned, ConnectorId::new_invalid()) }; if lock.free.is_empty() { index = lock.ports.len() as u32; lock.ports.push(Port{ self_id: PortIdLocal::new(index), peer_id: PortIdLocal::new_invalid(), kind, ownership, owning_connector: connector_id, peer_connector: connector_id }); } else { index = lock.free.pop().unwrap() as u32; let port = unsafe{ &mut *lock.ports.get_mut(index as usize) }; port.peer_id = PortIdLocal::new_invalid(); port.kind = kind; port.ownership = ownership; port.owning_connector = connector_id; port.peer_connector = connector_id; } return index; } // Create the ports let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector); let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector); debug_assert_ne!(putter_id, getter_id); // Point them to one another unsafe { let putter_port = &mut *lock.ports.get_mut(putter_id as usize); let getter_port = &mut *lock.ports.get_mut(getter_id as usize); putter_port.peer_id = getter_port.self_id; getter_port.peer_id = putter_port.self_id; } return Channel{ putter_id, getter_id } } } pub struct PortRef<'p> { lock: RwLockReadGuard<'p, PortStoreInner>, port: &'static mut Port, } impl<'p> std::ops::Deref for PortRef<'p> { type Target = Port; fn deref(&self) -> &Self::Target { return self.port; } } impl<'p> std::ops::DerefMut for PortRef<'p> { fn deref_mut(&mut self) -> &mut Self::Target { return self.port; } } impl Drop for PortStore { fn drop(&mut self) { let lock = self.inner.write().unwrap(); // Very lazy code for idx in 0..lock.ports.len() { if lock.free.contains(&idx) { continue; } unsafe { let port = lock.ports.get_mut(idx); std::ptr::drop_in_place(port); } } } } /// Global store of connectors, ports and queues that are used by the sceduler /// threads. The global store has the appearance of a thread-safe datatype, but /// one needs to be careful using it. /// /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. /// This includes the `should_exit` and `did_exit` pair! pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub ports: PortStore, pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { pub fn new() -> Self { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), ports: PortStore::with_capacity(256), should_exit: AtomicBool::new(false), } } }