diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index e199ab5e454d13ea8067111623dfb14e30526cf1..b1acb13a1fd227c022ce0788916b0ed1b7663c23 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -14,7 +14,7 @@ mod inbox; // Imports use std::sync::{Arc, Mutex}; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; use crate::ProtocolDescription; @@ -24,19 +24,113 @@ use scheduler::Scheduler; use native::{ConnectorApplication, ApplicationInterface}; -// Runtime API -// TODO: Exit condition is very dirty. Take into account: -// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working -// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done -// - User-owned interfaces: As long as these are owned user may still decide to create new connectors. +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. +pub(crate) struct ConnectorKey { + pub index: u32, // of connector +} + +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId(self.index); + } + + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ index: id.0 }; + } +} + +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct ConnectorId(pub u32); + +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId(u32::MAX); + } + + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.0 != u32::MAX; + } +} + +// TODO: Change this, I hate this. But I also don't want to put `public` and +// `router` of `ScheduledConnector` back into `Connector`. The reason I don't +// want `Box` everywhere is because of the v-table overhead. But +// to truly design this properly I need some benchmarks. +pub(crate) enum ConnectorVariant { + UserDefined(ConnectorPDL), + Native(Box), +} + +impl Connector for ConnectorVariant { + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + match self { + ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), + ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), + } + } + + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + match self { + ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), + } + } +} + +pub(crate) struct ScheduledConnector { + pub connector: ConnectorVariant, // access by connector + pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector + pub public: ConnectorPublic, // accessible by all schedulers and connectors + pub router: Router, +} + +/// Externally facing runtime. pub struct Runtime { inner: Arc, } pub(crate) struct RuntimeInner { - pub(crate) global_store: GlobalStore, + // Protocol pub(crate) protocol_description: ProtocolDescription, - schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped + // Storage of connectors in a kind of freelist. Note the vector of points to + // ensure pointer stability: the vector can be changed but the entries + // themselves remain valid. + pub connectors_list: RawVec<*mut ScheduledConnector>, + pub connectors_free: Vec, + + pub(crate) global_store: GlobalStore, + schedulers: Mutex>>, + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels +} + +impl RuntimeInner { + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { + // Became 0 + // TODO: Check num connectors, if 0, then set exit flag + } + } } // TODO: Come back to this at some point @@ -44,24 +138,28 @@ unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} impl Runtime { - pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { + pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ global_store: GlobalStore::new(), protocol_description, schedulers: Mutex::new(Vec::new()), + active_interfaces: AtomicU32::new(1), // we are the active interface }); // Launch threads { - let mut schedulers = Vec::with_capacity(num_threads); - for _ in 0..num_threads { + let mut schedulers = Vec::with_capacity(num_threads as usize); + for thread_index in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); - let thread = thread::spawn(move || { - let mut scheduler = Scheduler::new(cloned_runtime_inner); - scheduler.run(); - }); + let thread = thread::Builder::new() + .name(format!("thread-{}", thread_index)) + .spawn(move || { + let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); + scheduler.run(); + }) + .unwrap(); schedulers.push(thread); } @@ -92,7 +190,7 @@ impl Drop for Runtime { self.inner.global_store.should_exit.store(true, Ordering::Release); let mut schedulers = self.inner.schedulers.lock().unwrap(); for scheduler in schedulers.drain(..) { - scheduler.join(); + scheduler.join().unwrap(); } } } \ No newline at end of file