diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index ca7b4ea548b598fca2e83d3555780fed381fbefb..9b75ae2a4536113990a55afca44b831331b5e84f 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,51 +1,82 @@ use crate::collections::{MpmcQueue, RawVec}; -use super::connector::Connector; +use super::connector::{Connector, ConnectorPublic}; +use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use std::ptr; -use std::sync::RwLock; +use std::sync::{RwLock, RwLockReadGuard}; /// A kind of token that, once obtained, allows access to a container. struct ConnectorKey { index: u32, // of connector } +/// The registry containing all connectors. The idea here is that when someone +/// owns a `ConnectorKey`, then one has unique access to that connector. +/// Otherwise one has shared access. +/// +/// This datastructure is built to be wrapped in a RwLock. struct ConnectorStore { + inner: RwLock, +} + +struct ConnectorStoreInner { connectors: RawVec<*mut Connector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { - Self{ - connectors: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), + return Self{ + inner: RwLock::new(ConnectorStoreInner { + connectors: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + }), + }; + } + + /// Retrieves the shared members of the connector. + pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic { + let lock = self.inner.read().unwrap(); + + unsafe { + let connector = lock.connectors.get(connector_id as usize); + debug_assert!(!connector.is_null()); + return &*connector.public; } } - fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + /// Retrieves a particular connector. Only the thread that pulled the + /// associated key out of the execution queue should (be able to) call this. + pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + let lock = self.inner.read().unwrap(); + unsafe { - let connector = self.connectors.get_mut(key.index as usize); + let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return *connector as &mut _; } } - fn create(&mut self, connector: Connector) -> ConnectorKey { + /// Create a new connector, returning the key that can be used to retrieve + /// and/or queue it. + pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { + let lock = self.inner.write().unwrap(); + let index; - if self.free.is_empty() { + if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); unsafe { // Cheating a bit here. Anyway, move to heap, store in list - index = self.connectors.len(); - self.connectors.push(connector); + index = lock.connectors.len(); + lock.connectors.push(connector); } } else { - index = self.free.pop().unwrap(); + index = lock.free.pop().unwrap(); unsafe { - let target = self.connectors.get_mut(index); + let target = lock.connectors.get_mut(index); debug_assert!(!target.is_null()); ptr::write(*target, connector); } @@ -54,79 +85,170 @@ impl ConnectorStore { return ConnectorKey{ index: index as u32 }; } - fn destroy(&mut self, key: ConnectorKey) { + pub(crate) fn destroy(&self, key: ConnectorKey) { + let lock = self.inner.write().unwrap(); + unsafe { - let connector = self.connectors.get_mut(key.index as usize); + let connector = lock.connectors.get_mut(key.index as usize); ptr::drop_in_place(*connector); // Note: but not deallocating! } - self.free.push(key.index as usize); + lock.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { - for idx in 0..self.connectors.len() { + let lock = self.inner.write().unwrap(); + + for idx in 0..lock.connectors.len() { unsafe { - let memory = *self.connectors.get_mut(idx); - let boxed = Box::from_raw(memory); // takes care of deallocation + let memory = *lock.connectors.get_mut(idx); + let _ = Box::from_raw(memory); // takes care of deallocation } } } } -/// Global store of connectors, ports and queues that are used by the sceduler -/// threads. The global store has the appearance of a thread-safe datatype, but -/// one needs to be careful using it. -/// -/// The intention of this data structure is to enforce the rules: -/// TODO: @docs -pub struct GlobalStore { - connector_queue: MpmcQueue, - connectors: RwLock, +/// The registry of all ports +pub struct PortStore { + inner: RwLock, } -impl GlobalStore { - pub fn new() -> Self { +struct PortStoreInner { + ports: RawVec, + free: Vec, +} + +impl PortStore { + fn with_capacity(capacity: usize) -> Self { Self{ - connector_queue: MpmcQueue::with_capacity(256), - connectors: RwLock::new(ConnectorStore::with_capacity(256)), + inner: RwLock::new(PortStoreInner{ + ports: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + }), } } - // Taking connectors out of global queue + pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef { + let lock = self.inner.read().unwrap(); + debug_assert!(port_id.is_valid()); + + unsafe { + let port = lock.ports.get_mut(port_id.index as usize); + let port = &mut *port; + debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing - pub fn pop_key(&self) -> Option { - return self.connector_queue.pop_front(); + return PortRef{ lock, port }; + } } - pub fn push_key(&self, key: ConnectorKey) { - self.connector_queue.push_back(key); + pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { + let mut lock = self.inner.write().unwrap(); + + // Reserves a new port. Doesn't point it to its counterpart + fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { + let index; + let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned }; + let connector_id = creating_connector.unwrap_or(0); + + if lock.free.is_empty() { + index = lock.ports.len() as u32; + lock.ports.push(Port{ + self_id: PortIdLocal::new(index), + peer_id: PortIdLocal::new_invalid(), + kind, + ownership, + owning_connector: connector_id, + peer_connector: connector_id + }); + } else { + index = lock.free.pop().unwrap() as u32; + let port = unsafe{ &mut *lock.ports.get_mut(index as usize) }; + + port.peer_id = PortIdLocal::new_invalid(); + port.kind = kind; + port.ownership = ownership; + port.owning_connector = connector_id; + port.peer_connector = connector_id; + } + + return index; + } + + // Create the ports + let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector); + let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector); + debug_assert_ne!(putter_id, getter_id); + + // Point them to one another + unsafe { + let putter_port = &mut *lock.ports.get_mut(putter_id as usize); + let getter_port = &mut *lock.ports.get_mut(getter_id as usize); + putter_port.peer_id = getter_port.self_id; + getter_port.peer_id = putter_port.self_id; + } + + return Channel{ putter_id, getter_id } } +} + +pub struct PortRef<'p> { + lock: RwLockReadGuard<'p, PortStoreInner>, + port: &'static mut Port, +} - // Creating, retrieving and destroying connectors +impl<'p> std::ops::Deref for PortRef<'p> { + type Target = Port; - /// Retrieves a connector using the provided key. Note that the returned - /// reference is not truly static, the `GlobalStore` needs to stay alive. - pub fn get_connector(&self, key: &ConnectorKey) -> &'static mut Connector { - let connectors = self.connectors.read().unwrap(); - return connectors.get_mut(key); + fn deref(&self) -> &Self::Target { + return self.port; } +} - /// Adds a connector to the global system. Will also queue it to run - pub fn add_connector(&self, connector: Connector) { - let key = { - let mut connectors = self.connectors.write().unwrap(); - connectors.create(connector) - }; +impl<'p> std::ops::DerefMut for PortRef<'p> { + fn deref_mut(&mut self) -> &mut Self::Target { + return self.port; + } +} + +impl Drop for PortStore { + fn drop(&mut self) { + let lock = self.inner.write().unwrap(); + + // Very lazy code + for idx in 0..lock.ports.len() { + if lock.free.contains(&idx) { + continue; + } - self.connector_queue.push_back(key); + unsafe { + let port = lock.ports.get_mut(idx); + std::ptr::drop_in_place(port); + } + } } +} + +/// Global store of connectors, ports and queues that are used by the sceduler +/// threads. The global store has the appearance of a thread-safe datatype, but +/// one needs to be careful using it. +/// +/// TODO: @docs +/// TODO: @Optimize, very lazy implementation of concurrent datastructures. +pub struct GlobalStore { + pub connector_queue: MpmcQueue, + pub connectors: ConnectorStore, + pub ports: PortStore, +} - /// Destroys a connector - pub fn destroy_connector(&self, key: ConnectorKey) { - let mut connectors = self.connectors.write().unwrap(); - connectors.destroy(key); +impl GlobalStore { + pub fn new() -> Self { + Self{ + connector_queue: MpmcQueue::with_capacity(256), + connectors: ConnectorStore::with_capacity(256), + ports: PortStore::with_capacity(256), + } } } \ No newline at end of file