Files
@ 7d01f1245b7c
Branch filter:
Location: CSY/reowolf/src/runtime2/mod.rs
7d01f1245b7c
6.5 KiB
application/rls-services+xml
Everything compiles again, pending restructuring of shared runtime objects
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | // Structure of module
mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;
#[cfg(test)] mod tests;
// Imports
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread::{self, JoinHandle};
use crate::ProtocolDescription;
use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use native::{ConnectorApplication, ApplicationInterface};
/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
pub index: u32, // of connector
}
impl ConnectorKey {
/// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
/// access, to a "regular ID" which can be used to obtain immutable access.
#[inline]
pub fn downcast(&self) -> ConnectorId {
return ConnectorId(self.index);
}
/// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
/// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
#[inline]
pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
return ConnectorKey{ index: id.0 };
}
}
/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConnectorId(pub u32);
impl ConnectorId {
// TODO: Like the other `new_invalid`, maybe remove
#[inline]
pub fn new_invalid() -> ConnectorId {
return ConnectorId(u32::MAX);
}
#[inline]
pub(crate) fn is_valid(&self) -> bool {
return self.0 != u32::MAX;
}
}
// TODO: Change this, I hate this. But I also don't want to put `public` and
// `router` of `ScheduledConnector` back into `Connector`. The reason I don't
// want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
// to truly design this properly I need some benchmarks.
pub(crate) enum ConnectorVariant {
UserDefined(ConnectorPDL),
Native(Box<dyn Connector>),
}
impl Connector for ConnectorVariant {
fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
match self {
ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
}
}
fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
match self {
ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
}
}
}
pub(crate) struct ScheduledConnector {
pub connector: ConnectorVariant, // access by connector
pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
pub public: ConnectorPublic, // accessible by all schedulers and connectors
pub router: Router,
}
/// Externally facing runtime.
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
pub(crate) struct RuntimeInner {
// Protocol
pub(crate) protocol_description: ProtocolDescription,
// Storage of connectors in a kind of freelist. Note the vector of points to
// ensure pointer stability: the vector can be changed but the entries
// themselves remain valid.
pub connectors_list: RawVec<*mut ScheduledConnector>,
pub connectors_free: Vec<usize>,
pub(crate) global_store: GlobalStore,
schedulers: Mutex<Vec<JoinHandle<()>>>,
active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
}
impl RuntimeInner {
#[inline]
pub(crate) fn increment_active_interfaces(&self) {
let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero
}
pub(crate) fn decrement_active_interfaces(&self) {
let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
debug_assert!(old_num > 0);
if old_num == 1 {
// Became 0
// TODO: Check num connectors, if 0, then set exit flag
}
}
}
// TODO: Come back to this at some point
unsafe impl Send for RuntimeInner {}
unsafe impl Sync for RuntimeInner {}
impl Runtime {
pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
// Setup global state
assert!(num_threads > 0, "need a thread to run connectors");
let runtime_inner = Arc::new(RuntimeInner{
global_store: GlobalStore::new(),
protocol_description,
schedulers: Mutex::new(Vec::new()),
active_interfaces: AtomicU32::new(1), // we are the active interface
});
// Launch threads
{
let mut schedulers = Vec::with_capacity(num_threads as usize);
for thread_index in 0..num_threads {
let cloned_runtime_inner = runtime_inner.clone();
let thread = thread::Builder::new()
.name(format!("thread-{}", thread_index))
.spawn(move || {
let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
scheduler.run();
})
.unwrap();
schedulers.push(thread);
}
let mut lock = runtime_inner.schedulers.lock().unwrap();
*lock = schedulers;
}
// Return runtime
return Runtime{ inner: runtime_inner };
}
/// Returns a new interface through which channels and connectors can be
/// created.
pub fn create_interface(&self) -> ApplicationInterface {
let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
let connector_key = self.inner.global_store.connectors.create_interface(connector);
interface.set_connector_id(connector_key.downcast());
// Note that we're not scheduling. That is done by the interface in case
// it is actually needed.
return interface;
}
}
impl Drop for Runtime {
fn drop(&mut self) {
self.inner.global_store.should_exit.store(true, Ordering::Release);
let mut schedulers = self.inner.schedulers.lock().unwrap();
for scheduler in schedulers.drain(..) {
scheduler.join().unwrap();
}
}
}
|