use std::ptr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, AtomicU32}; use crate::collections::{MpmcQueue, RawVec}; use crate::ProtocolDescription; use super::scheduler::{Router, ConnectorCtx}; use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::inbox::Message; use super::native::{Connector, ConnectorApplication}; /// The registry containing all connectors. The idea here is that when someone /// owns a `ConnectorKey`, then one has unique access to that connector. /// Otherwise one has shared access. /// /// This datastructure is built to be wrapped in a RwLock. pub(crate) struct ConnectorStore { pub(crate) port_counter: Arc, inner: RwLock, } struct ConnectorStoreInner { connectors: RawVec<*mut ScheduledConnector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { return Self{ port_counter: Arc::new(AtomicU32::new(0)), inner: RwLock::new(ConnectorStoreInner { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), }), }; } /// Retrieves the shared members of the connector. pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); return &(**connector).public; } } /// Retrieves a particular connector. Only the thread that pulled the /// associated key out of the execution queue should (be able to) call this. pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.inner.read().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return &mut (**connector); } } pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey { // Connector interface does not own any initial ports, and cannot be // created by another connector let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true); return key; } /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. The caller must make sure that the constructed /// connector's code is initialized with the same ports as the ports in the /// `initial_ports` array. Furthermore the connector is initialized as not /// sleeping, so MUST be put on the connector queue by the caller. pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false); let new_connector = self.get_mut(&key); // Transferring ownership of ports (and crashing if there is a // programmer's mistake in port management) match &new_connector.connector { ConnectorVariant::UserDefined(connector) => { for port_id in &connector.ports.owned_ports { let mut port = created_by.context.remove_port(*port_id); new_connector.context.add_port(port); } }, ConnectorVariant::Native(_) => unreachable!(), } return key; } pub(crate) fn destroy(&self, key: ConnectorKey) { let mut lock = self.inner.write().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); ptr::drop_in_place(*connector); // Note: but not deallocating! } lock.free.push(key.index as usize); } /// Creates a connector but does not set its initial ports fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey { // Creation of the connector in the global store, requires a lock let index; { let mut lock = self.inner.write().unwrap(); let connector = ScheduledConnector { connector, context: ConnectorCtx::new(self.port_counter.clone()), public: ConnectorPublic::new(initialize_as_sleeping), router: Router::new(), }; if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); index = lock.connectors.len(); lock.connectors.push(connector); } else { index = lock.free.pop().unwrap(); unsafe { let target = lock.connectors.get_mut(index); debug_assert!(!target.is_null()); ptr::write(*target, connector); } } } // Generate key and retrieve the connector to set its ID let key = ConnectorKey{ index: index as u32 }; let new_connector = self.get_mut(&key); new_connector.context.id = key.downcast(); // Return the connector key return key; } } impl Drop for ConnectorStore { fn drop(&mut self) { let lock = self.inner.write().unwrap(); for idx in 0..lock.connectors.len() { unsafe { let memory = *lock.connectors.get_mut(idx); let _ = Box::from_raw(memory); // takes care of deallocation } } } } /// Global store of connectors, ports and queues that are used by the sceduler /// threads. The global store has the appearance of a thread-safe datatype, but /// one needs to be careful using it. /// /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. /// This includes the `should_exit` and `did_exit` pair! pub(crate) struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { pub(crate) fn new() -> Self { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), should_exit: AtomicBool::new(false), } } }