diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index b75ec01ec78f0103f2fa427197c12a89cca226f0..8ac3156cbb45147d161cbe4457f58ea181a64fe1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -3,6 +3,7 @@ mod runtime; mod messages; mod connector; +mod native; mod port; mod global_store; mod scheduler; @@ -12,87 +13,86 @@ mod inbox; // Imports -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::Ordering; use std::thread::{self, JoinHandle}; use crate::protocol::eval::*; use crate::{common::Id, PortId, ProtocolDescription}; -use global_store::GlobalStore; +use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; use crate::protocol::ComponentCreationError; -use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group}; +use connector::{Branch, ConnectorPDL, find_ports_in_value_group}; +use native::{ConnectorApplication, ApplicationInterface}; // Runtime API +// TODO: Exit condition is very dirty. Take into account: +// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working +// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done +// - User-owned interfaces: As long as these are owned user may still decide to create new connectors. pub struct Runtime { - global_store: Arc, - protocol_description: Arc, - schedulers: Vec> + inner: Arc, +} + +pub(crate) struct RuntimeInner { + pub(crate) global_store: GlobalStore, + pub(crate) protocol_description: ProtocolDescription, + schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped } impl Runtime { - pub fn new(num_threads: usize, protocol_description: Arc) -> Runtime { + pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); - let global_store = Arc::new(GlobalStore::new()); + let runtime_inner = Arc::new(RuntimeInner{ + global_store: GlobalStore::new(), + protocol_description, + schedulers: Mutex::new(Vec::new()), + }); // Launch threads - let mut schedulers = Vec::with_capacity(num_threads); - for _ in 0..num_threads { - let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone()); - let thread = thread::spawn(move || { - scheduler.run(); - }); - - schedulers.push(thread); + { + let mut schedulers = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let mut scheduler = Scheduler::new(runtime_inner.clone()); + let thread = thread::spawn(move || { + scheduler.run(); + }); + + schedulers.push(thread); + } + + let mut lock = runtime_inner.schedulers.lock().unwrap(); + *lock = schedulers; } - // Move innards into runtime struct - return Runtime{ - global_store, - protocol_description, - schedulers, - } + // Return runtime + return Runtime{ inner: runtime_inner }; } - /// Returns (putter port, getter port) - pub fn create_channel(&self) -> (Value, Value) { - let channel = self.global_store.ports.create_channel(None); - let putter_value = Value::Output(PortId(Id{ - connector_id: u32::MAX, - u32_suffix: channel.putter_id, - })); - let getter_value = Value::Input(PortId(Id{ - connector_id: u32::MAX, - u32_suffix: channel.getter_id, - })); - return (putter_value, getter_value); - } + /// Returns a new interface through which channels and connectors can be + /// created. + pub fn create_interface(&self) -> ApplicationInterface { + let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); + let connector = Box::new(connector); - pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> { - // TODO: Remove component creation function from PD, should not be concerned with it - // Create the connector and mark the ports as now owned by the - // connector - let mut port_ids = Vec::new(); - find_ports_in_value_group(&values, &mut port_ids); - - let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?; - let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone()); - let connector_key = self.global_store.connectors.create(connector); - - for port_id in port_ids { - let port = self.global_store.ports.get(&connector_key, port_id); - port.owning_connector = connector_key.downcast(); - port.peer_connector - // TODO: Note that we immediately need to notify the other side of the connector that - // the port has moved! - } + let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector)); + interface.set_connector_id(connector_key.downcast()); + + // Note that we're not scheduling. That is done by the interface in case + // it is actually needed. + return interface; } } impl Drop for Runtime { fn drop(&mut self) { - + self.inner.global_store.should_exit.store(true, Ordering::Release); + let mut schedulers = self.inner.schedulers.lock().unwrap(); + for scheduler in schedulers.drain(..) { + scheduler.join(); + } } } \ No newline at end of file