// Structure of module mod runtime; mod messages; mod connector; mod native; mod port; mod global_store; mod scheduler; mod inbox; #[cfg(test)] mod tests; // Imports use std::sync::{Arc, Mutex}; use std::sync::atomic::Ordering; use std::thread::{self, JoinHandle}; use crate::ProtocolDescription; use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; use native::{ConnectorApplication, ApplicationInterface}; // Runtime API // TODO: Exit condition is very dirty. Take into account: // - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working // - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done // - User-owned interfaces: As long as these are owned user may still decide to create new connectors. pub struct Runtime { inner: Arc, } pub(crate) struct RuntimeInner { pub(crate) global_store: GlobalStore, pub(crate) protocol_description: ProtocolDescription, schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped } // TODO: Come back to this at some point unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} impl Runtime { pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ global_store: GlobalStore::new(), protocol_description, schedulers: Mutex::new(Vec::new()), }); // Launch threads { let mut schedulers = Vec::with_capacity(num_threads); for _ in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); let thread = thread::spawn(move || { let mut scheduler = Scheduler::new(cloned_runtime_inner); scheduler.run(); }); schedulers.push(thread); } let mut lock = runtime_inner.schedulers.lock().unwrap(); *lock = schedulers; } // Return runtime return Runtime{ inner: runtime_inner }; } /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); let connector_key = self.inner.global_store.connectors.create_interface(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case // it is actually needed. return interface; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.global_store.should_exit.store(true, Ordering::Release); let mut schedulers = self.inner.schedulers.lock().unwrap(); for scheduler in schedulers.drain(..) { scheduler.join(); } } }