Files
@ 26d47db4f922
Branch filter:
Location: CSY/reowolf/src/runtime2/mod.rs - annotation
26d47db4f922
3.1 KiB
application/rls-services+xml
WIP on second rewrite of port management
daf15df0f8ca daf15df0f8ca 0d5a89aea247 ff6ade8b8097 f4f12a71e2e2 58dfabd1be9f cf26538b25dc 1aef293674a6 1aef293674a6 daf15df0f8ca ff6ade8b8097 f4f12a71e2e2 daf15df0f8ca daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca 58dfabd1be9f daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca daf15df0f8ca 58dfabd1be9f daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca daf15df0f8ca 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f 58dfabd1be9f daf15df0f8ca daf15df0f8ca | // Structure of module
mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;
#[cfg(test)] mod tests;
// Imports
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
use std::thread::{self, JoinHandle};
use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};
use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
use native::{ConnectorApplication, ApplicationInterface};
// Runtime API
// TODO: Exit condition is very dirty. Take into account:
// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
// - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
pub(crate) struct RuntimeInner {
pub(crate) global_store: GlobalStore,
pub(crate) protocol_description: ProtocolDescription,
schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
}
impl Runtime {
pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
// Setup global state
assert!(num_threads > 0, "need a thread to run connectors");
let runtime_inner = Arc::new(RuntimeInner{
global_store: GlobalStore::new(),
protocol_description,
schedulers: Mutex::new(Vec::new()),
});
// Launch threads
{
let mut schedulers = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let mut scheduler = Scheduler::new(runtime_inner.clone());
let thread = thread::spawn(move || {
scheduler.run();
});
schedulers.push(thread);
}
let mut lock = runtime_inner.schedulers.lock().unwrap();
*lock = schedulers;
}
// Return runtime
return Runtime{ inner: runtime_inner };
}
/// Returns a new interface through which channels and connectors can be
/// created.
pub fn create_interface(&self) -> ApplicationInterface {
let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
let connector = Box::new(connector);
let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
interface.set_connector_id(connector_key.downcast());
// Note that we're not scheduling. That is done by the interface in case
// it is actually needed.
return interface;
}
}
impl Drop for Runtime {
fn drop(&mut self) {
self.inner.global_store.should_exit.store(true, Ordering::Release);
let mut schedulers = self.inner.schedulers.lock().unwrap();
for scheduler in schedulers.drain(..) {
scheduler.join();
}
}
}
|