// Structure of module
mod runtime;
mod messages;
mod connector;
mod port;
mod global_store;
mod scheduler;
mod inbox;
#[cfg(test)] mod tests;
// Imports
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};
use global_store::GlobalStore;
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};
// Runtime API
pub struct Runtime {
global_store: Arc<GlobalStore>,
protocol_description: Arc<ProtocolDescription>,
schedulers: Vec<JoinHandle<()>>
}
impl Runtime {
pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
// Setup global state
assert!(num_threads > 0, "need a thread to run connectors");
let global_store = Arc::new(GlobalStore::new());
// Launch threads
let mut schedulers = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
let thread = thread::spawn(move || {
scheduler.run();
});
schedulers.push(thread);
}
// Move innards into runtime struct
return Runtime{
global_store,
protocol_description,
schedulers,
}
}
/// Returns (putter port, getter port)
pub fn create_channel(&self) -> (Value, Value) {
let channel = self.global_store.ports.create_channel(None);
let putter_value = Value::Output(PortId(Id{
connector_id: u32::MAX,
u32_suffix: channel.putter_id,
}));
let getter_value = Value::Input(PortId(Id{
connector_id: u32::MAX,
u32_suffix: channel.getter_id,
}));
return (putter_value, getter_value);
}
pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
// TODO: Remove component creation function from PD, should not be concerned with it
// Create the connector and mark the ports as now owned by the
// connector
let mut port_ids = Vec::new();
find_ports_in_value_group(&values, &mut port_ids);
let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
let connector_key = self.global_store.connectors.create(connector);
for port_id in port_ids {
let port = self.global_store.ports.get(&connector_key, port_id);
port.owning_connector = connector_key.downcast();
port.peer_connector
// TODO: Note that we immediately need to notify the other side of the connector that
// the port has moved!
}
}
}
impl Drop for Runtime {
fn drop(&mut self) {
}
}