diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 191c529907db1526c6cd6337f190724e0830e2d3..018fa8e82011bd87902ced460d43bdefb7fc77ae 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,3 +1,7 @@ +use std::ptr; +use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard}; +use std::sync::atomic::{AtomicBool, AtomicU32}; + use crate::collections::{MpmcQueue, RawVec}; use super::connector::{ConnectorPDL, ConnectorPublic}; @@ -5,13 +9,11 @@ use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use super::inbox::PublicInbox; use super::scheduler::Router; -use std::ptr; -use std::sync::{Barrier, RwLock, RwLockReadGuard}; -use std::sync::atomic::AtomicBool; use crate::ProtocolDescription; use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; -use crate::runtime2::inbox::{DataMessage, SyncMessage}; +use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage}; use crate::runtime2::native::Connector; +use crate::runtime2::scheduler::ConnectorCtx; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -39,7 +41,7 @@ impl ConnectorKey { /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Copy, Clone)] -pub(crate) struct ConnectorId(u32); +pub(crate) struct ConnectorId(pub u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove @@ -64,31 +66,25 @@ pub enum ConnectorVariant { } impl Connector for ConnectorVariant { - fn insert_data_message(&mut self, message: DataMessage) { + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { match self { - ConnectorVariant::UserDefined(c) => c.insert_data_message(message), - ConnectorVariant::Native(c) => c.insert_data_message(message), + ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), + ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), } } - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { - ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state), - ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state), - } - } - - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state), - ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state), + ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), } } } pub struct ScheduledConnector { - pub connector: ConnectorVariant, - pub public: ConnectorPublic, + pub connector: ConnectorVariant, // access by connector + pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector + pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: Router, } @@ -97,7 +93,8 @@ pub struct ScheduledConnector { /// Otherwise one has shared access. /// /// This datastructure is built to be wrapped in a RwLock. -struct ConnectorStore { +pub(crate) struct ConnectorStore { + pub(crate) port_counter: Arc, inner: RwLock, } @@ -109,6 +106,7 @@ struct ConnectorStoreInner { impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { return Self{ + port_counter: Arc::new(AtomicU32::new(0)), inner: RwLock::new(ConnectorStoreInner { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), @@ -141,38 +139,53 @@ impl ConnectorStore { /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. - pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey { - let lock = self.inner.write().unwrap(); - let connector = ScheduledConnector{ - connector, - public: ConnectorPublic::new(), - router: Router::new(), - }; + pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey { + // Creation of the connector in the global store, requires a lock + { + let lock = self.inner.write().unwrap(); + let connector = ScheduledConnector { + connector, + context: ConnectorCtx::new(self.port_counter.clone()), + public: ConnectorPublic::new(), + router: Router::new(), + }; - let index; - if lock.free.is_empty() { - let connector = Box::into_raw(Box::new(connector)); + let index; + if lock.free.is_empty() { + let connector = Box::into_raw(Box::new(connector)); - unsafe { - // Cheating a bit here. Anyway, move to heap, store in list - index = lock.connectors.len(); - lock.connectors.push(connector); - } - } else { - index = lock.free.pop().unwrap(); + unsafe { + // Cheating a bit here. Anyway, move to heap, store in list + index = lock.connectors.len(); + lock.connectors.push(connector); + } + } else { + index = lock.free.pop().unwrap(); - unsafe { - let target = lock.connectors.get_mut(index); - debug_assert!(!target.is_null()); - ptr::write(*target, connector); + unsafe { + let target = lock.connectors.get_mut(index); + debug_assert!(!target.is_null()); + ptr::write(*target, connector); + } } } - // TODO: Clean up together with the trait + // Setting of new connector's ID let key = ConnectorKey{ index: index as u32 }; - let connector = self.get_mut(&key); - if let ConnectorVariant::UserDefined(connector) = &mut connector.connector { - connector.set_connector_id(key.downcast()); + let new_connector = self.get_mut(&key); + new_connector.context.id = key.downcast(); + + // Transferring ownership of ports (and crashing if there is a + // programmer's mistake in port management) + match &new_connector.connector { + ConnectorVariant::UserDefined(connector) => { + for port_id in &connector.ports.owned_ports { + let mut port = created_by.context.remove_port(*port_id); + port.owning_connector = new_connector.context.id; + new_connector.context.add_port(port); + } + }, + ConnectorVariant::Native(_) => {}, // no initial ports (yet!) } return key; @@ -204,127 +217,6 @@ impl Drop for ConnectorStore { } } -/// The registry of all ports -pub struct PortStore { - inner: RwLock, -} - -struct PortStoreInner { - ports: RawVec, - free: Vec, -} - -impl PortStore { - fn with_capacity(capacity: usize) -> Self { - Self{ - inner: RwLock::new(PortStoreInner{ - ports: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), - }), - } - } - - pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef { - let lock = self.inner.read().unwrap(); - debug_assert!(port_id.is_valid()); - - unsafe { - let port = lock.ports.get_mut(port_id.index as usize); - let port = &mut *port; - debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing - - return PortRef{ lock, port }; - } - } - - pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel { - let mut lock = self.inner.write().unwrap(); - - // Reserves a new port. Doesn't point it to its counterpart - fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 { - let index; - - if lock.free.is_empty() { - index = lock.ports.len() as u32; - lock.ports.push(Port{ - self_id: PortIdLocal::new(index), - peer_id: PortIdLocal::new_invalid(), - kind, - ownership: PortOwnership::Owned, - owning_connector: connector_id, - peer_connector: connector_id - }); - } else { - index = lock.free.pop().unwrap() as u32; - let port = unsafe{ &mut *lock.ports.get_mut(index as usize) }; - - port.peer_id = PortIdLocal::new_invalid(); - port.kind = kind; - port.ownership = PortOwnership::Owned; - port.owning_connector = connector_id; - port.peer_connector = connector_id; - } - - return index; - } - - // Create the ports - let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector); - let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector); - debug_assert_ne!(putter_id, getter_id); - - // Point them to one another - unsafe { - let putter_port = &mut *lock.ports.get_mut(putter_id as usize); - let getter_port = &mut *lock.ports.get_mut(getter_id as usize); - putter_port.peer_id = getter_port.self_id; - getter_port.peer_id = putter_port.self_id; - } - - return Channel{ - putter_id: PortIdLocal::new(putter_id), - getter_id: PortIdLocal::new(getter_id), - } - } -} - -pub struct PortRef<'p> { - lock: RwLockReadGuard<'p, PortStoreInner>, - port: &'static mut Port, -} - -impl<'p> std::ops::Deref for PortRef<'p> { - type Target = Port; - - fn deref(&self) -> &Self::Target { - return self.port; - } -} - -impl<'p> std::ops::DerefMut for PortRef<'p> { - fn deref_mut(&mut self) -> &mut Self::Target { - return self.port; - } -} - -impl Drop for PortStore { - fn drop(&mut self) { - let lock = self.inner.write().unwrap(); - - // Very lazy code - for idx in 0..lock.ports.len() { - if lock.free.contains(&idx) { - continue; - } - - unsafe { - let port = lock.ports.get_mut(idx); - std::ptr::drop_in_place(port); - } - } - } -} - /// Global store of connectors, ports and queues that are used by the sceduler /// threads. The global store has the appearance of a thread-safe datatype, but /// one needs to be careful using it. @@ -335,7 +227,6 @@ impl Drop for PortStore { pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, - pub ports: PortStore, pub should_exit: AtomicBool, // signal threads to exit } @@ -344,7 +235,6 @@ impl GlobalStore { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), - ports: PortStore::with_capacity(256), should_exit: AtomicBool::new(false), } }