Files
@ daf15df0f8ca
Branch filter:
Location: CSY/reowolf/src/runtime2/mod.rs - annotation
daf15df0f8ca
3.0 KiB
application/rls-services+xml
scaffolding in place for scheduler/runtime
daf15df0f8ca daf15df0f8ca 0d5a89aea247 ff6ade8b8097 f4f12a71e2e2 cf26538b25dc 1aef293674a6 1aef293674a6 daf15df0f8ca ff6ade8b8097 f4f12a71e2e2 daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca | // Structure of module
mod runtime;
mod messages;
mod connector;
mod port;
mod global_store;
mod scheduler;
mod inbox;
#[cfg(test)] mod tests;
// Imports
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};
use global_store::GlobalStore;
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};
// Runtime API
pub struct Runtime {
global_store: Arc<GlobalStore>,
protocol_description: Arc<ProtocolDescription>,
schedulers: Vec<JoinHandle<()>>
}
impl Runtime {
pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
// Setup global state
assert!(num_threads > 0, "need a thread to run connectors");
let global_store = Arc::new(GlobalStore::new());
// Launch threads
let mut schedulers = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
let thread = thread::spawn(move || {
scheduler.run();
});
schedulers.push(thread);
}
// Move innards into runtime struct
return Runtime{
global_store,
protocol_description,
schedulers,
}
}
/// Returns (putter port, getter port)
pub fn create_channel(&self) -> (Value, Value) {
let channel = self.global_store.ports.create_channel(None);
let putter_value = Value::Output(PortId(Id{
connector_id: u32::MAX,
u32_suffix: channel.putter_id,
}));
let getter_value = Value::Input(PortId(Id{
connector_id: u32::MAX,
u32_suffix: channel.getter_id,
}));
return (putter_value, getter_value);
}
pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
// TODO: Remove component creation function from PD, should not be concerned with it
// Create the connector and mark the ports as now owned by the
// connector
let mut port_ids = Vec::new();
find_ports_in_value_group(&values, &mut port_ids);
let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
let connector_key = self.global_store.connectors.create(connector);
for port_id in port_ids {
let port = self.global_store.ports.get(&connector_key, port_id);
port.owning_connector = connector_key.downcast();
port.peer_connector
// TODO: Note that we immediately need to notify the other side of the connector that
// the port has moved!
}
}
}
impl Drop for Runtime {
fn drop(&mut self) {
}
}
|