Files @ 7d01f1245b7c
Branch filter:

Location: CSY/reowolf/src/runtime2/mod.rs

7d01f1245b7c 6.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
Everything compiles again, pending restructuring of shared runtime objects
// Structure of module

mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;

#[cfg(test)] mod tests;

// Imports

use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread::{self, JoinHandle};

use crate::ProtocolDescription;

use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use native::{ConnectorApplication, ApplicationInterface};


/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConnectorId(pub u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.0 != u32::MAX;
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
//  to truly design this properly I need some benchmarks.
pub(crate) enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

impl Connector for ConnectorVariant {
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
        match self {
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
        }
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        match self {
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
        }
    }
}

pub(crate) struct ScheduledConnector {
    pub connector: ConnectorVariant, // access by connector
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
    pub router: Router,
}

/// Externally facing runtime.
pub struct Runtime {
    inner: Arc<RuntimeInner>,
}

pub(crate) struct RuntimeInner {
    // Protocol
    pub(crate) protocol_description: ProtocolDescription,
    // Storage of connectors in a kind of freelist. Note the vector of points to
    // ensure pointer stability: the vector can be changed but the entries
    // themselves remain valid.
    pub connectors_list: RawVec<*mut ScheduledConnector>,
    pub connectors_free: Vec<usize>,

    pub(crate) global_store: GlobalStore,
    schedulers: Mutex<Vec<JoinHandle<()>>>,
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
}

impl RuntimeInner {
    #[inline]
    pub(crate) fn increment_active_interfaces(&self) {
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
        debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero
    }

    pub(crate) fn decrement_active_interfaces(&self) {
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
        debug_assert!(old_num > 0);
        if old_num == 1 {
            // Became 0
            // TODO: Check num connectors, if 0, then set exit flag
        }
    }
}

// TODO: Come back to this at some point
unsafe impl Send for RuntimeInner {}
unsafe impl Sync for RuntimeInner {}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        // Setup global state
        assert!(num_threads > 0, "need a thread to run connectors");
        let runtime_inner = Arc::new(RuntimeInner{
            global_store: GlobalStore::new(),
            protocol_description,
            schedulers: Mutex::new(Vec::new()),
            active_interfaces: AtomicU32::new(1), // we are the active interface
        });

        // Launch threads
        {
            let mut schedulers = Vec::with_capacity(num_threads as usize);
            for thread_index in 0..num_threads {
                let cloned_runtime_inner = runtime_inner.clone();
                let thread = thread::Builder::new()
                    .name(format!("thread-{}", thread_index))
                    .spawn(move || {
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
                        scheduler.run();
                    })
                    .unwrap();

                schedulers.push(thread);
            }

            let mut lock = runtime_inner.schedulers.lock().unwrap();
            *lock = schedulers;
        }

        // Return runtime
        return Runtime{ inner: runtime_inner };
    }

    /// Returns a new interface through which channels and connectors can be
    /// created.
    pub fn create_interface(&self) -> ApplicationInterface {
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
        interface.set_connector_id(connector_key.downcast());

        // Note that we're not scheduling. That is done by the interface in case
        // it is actually needed.
        return interface;
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.global_store.should_exit.store(true, Ordering::Release);
        let mut schedulers = self.inner.schedulers.lock().unwrap();
        for scheduler in schedulers.drain(..) {
            scheduler.join().unwrap();
        }
    }
}