// Structure of module mod runtime; mod messages; mod connector; mod port; mod global_store; mod scheduler; mod inbox; #[cfg(test)] mod tests; // Imports use std::sync::Arc; use std::thread::{self, JoinHandle}; use crate::protocol::eval::*; use crate::{common::Id, PortId, ProtocolDescription}; use global_store::GlobalStore; use scheduler::Scheduler; use crate::protocol::ComponentCreationError; use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group}; // Runtime API pub struct Runtime { global_store: Arc, protocol_description: Arc, schedulers: Vec> } impl Runtime { pub fn new(num_threads: usize, protocol_description: Arc) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let global_store = Arc::new(GlobalStore::new()); // Launch threads let mut schedulers = Vec::with_capacity(num_threads); for _ in 0..num_threads { let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone()); let thread = thread::spawn(move || { scheduler.run(); }); schedulers.push(thread); } // Move innards into runtime struct return Runtime{ global_store, protocol_description, schedulers, } } /// Returns (putter port, getter port) pub fn create_channel(&self) -> (Value, Value) { let channel = self.global_store.ports.create_channel(None); let putter_value = Value::Output(PortId(Id{ connector_id: u32::MAX, u32_suffix: channel.putter_id, })); let getter_value = Value::Input(PortId(Id{ connector_id: u32::MAX, u32_suffix: channel.getter_id, })); return (putter_value, getter_value); } pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> { // TODO: Remove component creation function from PD, should not be concerned with it // Create the connector and mark the ports as now owned by the // connector let mut port_ids = Vec::new(); find_ports_in_value_group(&values, &mut port_ids); let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?; let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone()); let connector_key = self.global_store.connectors.create(connector); for port_id in port_ids { let port = self.global_store.ports.get(&connector_key, port_id); port.owning_connector = connector_key.downcast(); port.peer_connector // TODO: Note that we immediately need to notify the other side of the connector that // the port has moved! } } } impl Drop for Runtime { fn drop(&mut self) { } }