// Structure of module mod runtime; mod messages; mod connector; mod native; mod port; mod global_store; mod scheduler; mod inbox; #[cfg(test)] mod tests; // Imports use std::sync::{Arc, Mutex}; use std::sync::atomic::Ordering; use std::thread::{self, JoinHandle}; use crate::protocol::eval::*; use crate::{common::Id, PortId, ProtocolDescription}; use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; use crate::protocol::ComponentCreationError; use connector::{Branch, ConnectorPDL, find_ports_in_value_group}; use native::{ConnectorApplication, ApplicationInterface}; // Runtime API // TODO: Exit condition is very dirty. Take into account: // - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working // - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done // - User-owned interfaces: As long as these are owned user may still decide to create new connectors. pub struct Runtime { inner: Arc, } pub(crate) struct RuntimeInner { pub(crate) global_store: GlobalStore, pub(crate) protocol_description: ProtocolDescription, schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped } impl Runtime { pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ global_store: GlobalStore::new(), protocol_description, schedulers: Mutex::new(Vec::new()), }); // Launch threads { let mut schedulers = Vec::with_capacity(num_threads); for _ in 0..num_threads { let mut scheduler = Scheduler::new(runtime_inner.clone()); let thread = thread::spawn(move || { scheduler.run(); }); schedulers.push(thread); } let mut lock = runtime_inner.schedulers.lock().unwrap(); *lock = schedulers; } // Return runtime return Runtime{ inner: runtime_inner }; } /// Returns a new interface through which channels and connectors can be /// created. pub fn create_interface(&self) -> ApplicationInterface { let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); let connector = Box::new(connector); let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector)); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case // it is actually needed. return interface; } } impl Drop for Runtime { fn drop(&mut self) { self.inner.global_store.should_exit.store(true, Ordering::Release); let mut schedulers = self.inner.schedulers.lock().unwrap(); for scheduler in schedulers.drain(..) { scheduler.join(); } } }