// Structure of module mod runtime; mod messages; mod connector; mod native; mod port; mod global_store; mod scheduler; mod inbox; #[cfg(test)] mod tests; // Imports use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; use crate::ProtocolDescription; use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; use native::{ConnectorApplication, ApplicationInterface}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this /// key is the only one that may execute the connector's code. pub(crate) struct ConnectorKey { pub index: u32, // of connector } impl ConnectorKey { /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable /// access, to a "regular ID" which can be used to obtain immutable access. #[inline] pub fn downcast(&self) -> ConnectorId { return ConnectorId(self.index); } /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system #[inline] pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { return ConnectorKey{ index: id.0 }; } } /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct ConnectorId(pub u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove #[inline] pub fn new_invalid() -> ConnectorId { return ConnectorId(u32::MAX); } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.0 != u32::MAX; } } // TODO: Change this, I hate this. But I also don't want to put `public` and // `router` of `ScheduledConnector` back into `Connector`. The reason I don't // want `Box` everywhere is because of the v-table overhead. But // to truly design this properly I need some benchmarks. pub(crate) enum ConnectorVariant { UserDefined(ConnectorPDL), Native(Box), } impl Connector for ConnectorVariant { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { match self { ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), } } fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), } } } pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: Router, } /// Externally facing runtime. pub struct Runtime { inner: Arc, } pub(crate) struct RuntimeInner { // Protocol pub(crate) protocol_description: ProtocolDescription, // Storage of connectors in a kind of freelist. Note the vector of points to // ensure pointer stability: the vector can be changed but the entries // themselves remain valid. pub connectors_list: RawVec<*mut ScheduledConnector>, pub connectors_free: Vec, pub(crate) global_store: GlobalStore, schedulers: Mutex>>, active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels } impl RuntimeInner { #[inline] pub(crate) fn increment_active_interfaces(&self) { let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero } pub(crate) fn decrement_active_interfaces(&self) { let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); debug_assert!(old_num > 0); if old_num == 1 { // Became 0 // TODO: Check num connectors, if 0, then set exit flag } } } // TODO: Come back to this at some point unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ global_store: GlobalStore::new(), protocol_description, schedulers: Mutex::new(Vec::new()), active_interfaces: AtomicU32::new(1), // we are the active interface }); // Launch threads { let mut schedulers = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); let thread = thread::Builder::new() .name(format!("thread-{}", thread_index)) .spawn(move || { let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); scheduler.run(); }) .unwrap(); schedulers.push(thread); } let mut lock = runtime_inner.schedulers.lock().unwrap(); *lock = schedulers; } // Return runtime return Runtime{ inner: runtime_inner }; } /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); let connector_key = self.inner.global_store.connectors.create_interface(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case // it is actually needed. return interface; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.global_store.should_exit.store(true, Ordering::Release); let mut schedulers = self.inner.schedulers.lock().unwrap(); for scheduler in schedulers.drain(..) { scheduler.join().unwrap(); } } }