// Structure of module
mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;
#[cfg(test)] mod tests;
// Imports
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
use std::thread::{self, JoinHandle};
use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};
use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
use native::{ConnectorApplication, ApplicationInterface};
// Runtime API
// TODO: Exit condition is very dirty. Take into account:
// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
// - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
pub(crate) struct RuntimeInner {
pub(crate) global_store: GlobalStore,
pub(crate) protocol_description: ProtocolDescription,
schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
}
impl Runtime {
pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
// Setup global state
assert!(num_threads > 0, "need a thread to run connectors");
let runtime_inner = Arc::new(RuntimeInner{
global_store: GlobalStore::new(),
protocol_description,
schedulers: Mutex::new(Vec::new()),
});
// Launch threads
{
let mut schedulers = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let mut scheduler = Scheduler::new(runtime_inner.clone());
let thread = thread::spawn(move || {
scheduler.run();
});
schedulers.push(thread);
}
let mut lock = runtime_inner.schedulers.lock().unwrap();
*lock = schedulers;
}
// Return runtime
return Runtime{ inner: runtime_inner };
}
/// Returns a new interface through which channels and connectors can be
/// created.
pub fn create_interface(&self) -> ApplicationInterface {
let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
let connector = Box::new(connector);
let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
interface.set_connector_id(connector_key.downcast());
// Note that we're not scheduling. That is done by the interface in case
// it is actually needed.
return interface;
}
}
impl Drop for Runtime {
fn drop(&mut self) {
self.inner.global_store.should_exit.store(true, Ordering::Release);
let mut schedulers = self.inner.schedulers.lock().unwrap();
for scheduler in schedulers.drain(..) {
scheduler.join();
}
}
}