Files @ 7d01f1245b7c
Branch filter:

Location: CSY/reowolf/src/runtime2/mod.rs - annotation

7d01f1245b7c 6.5 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
Everything compiles again, pending restructuring of shared runtime objects
daf15df0f8ca
daf15df0f8ca
0d5a89aea247
ff6ade8b8097
f4f12a71e2e2
58dfabd1be9f
cf26538b25dc
1aef293674a6
1aef293674a6
daf15df0f8ca
ff6ade8b8097
f4f12a71e2e2
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
7d01f1245b7c
daf15df0f8ca
daf15df0f8ca
a43d61913724
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
daf15df0f8ca
daf15df0f8ca
a43d61913724
a43d61913724
a43d61913724
a43d61913724
daf15df0f8ca
7d01f1245b7c
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
7d01f1245b7c
7d01f1245b7c
a43d61913724
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
a43d61913724
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
// Structure of module

mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;

#[cfg(test)] mod tests;

// Imports

use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread::{self, JoinHandle};

use crate::ProtocolDescription;

use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use native::{ConnectorApplication, ApplicationInterface};


/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ConnectorId(pub u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.0 != u32::MAX;
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
//  to truly design this properly I need some benchmarks.
pub(crate) enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

impl Connector for ConnectorVariant {
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
        match self {
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
        }
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        match self {
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
        }
    }
}

pub(crate) struct ScheduledConnector {
    pub connector: ConnectorVariant, // access by connector
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
    pub router: Router,
}

/// Externally facing runtime.
pub struct Runtime {
    inner: Arc<RuntimeInner>,
}

pub(crate) struct RuntimeInner {
    // Protocol
    pub(crate) protocol_description: ProtocolDescription,
    // Storage of connectors in a kind of freelist. Note the vector of points to
    // ensure pointer stability: the vector can be changed but the entries
    // themselves remain valid.
    pub connectors_list: RawVec<*mut ScheduledConnector>,
    pub connectors_free: Vec<usize>,

    pub(crate) global_store: GlobalStore,
    schedulers: Mutex<Vec<JoinHandle<()>>>,
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
}

impl RuntimeInner {
    #[inline]
    pub(crate) fn increment_active_interfaces(&self) {
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
        debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero
    }

    pub(crate) fn decrement_active_interfaces(&self) {
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
        debug_assert!(old_num > 0);
        if old_num == 1 {
            // Became 0
            // TODO: Check num connectors, if 0, then set exit flag
        }
    }
}

// TODO: Come back to this at some point
unsafe impl Send for RuntimeInner {}
unsafe impl Sync for RuntimeInner {}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        // Setup global state
        assert!(num_threads > 0, "need a thread to run connectors");
        let runtime_inner = Arc::new(RuntimeInner{
            global_store: GlobalStore::new(),
            protocol_description,
            schedulers: Mutex::new(Vec::new()),
            active_interfaces: AtomicU32::new(1), // we are the active interface
        });

        // Launch threads
        {
            let mut schedulers = Vec::with_capacity(num_threads as usize);
            for thread_index in 0..num_threads {
                let cloned_runtime_inner = runtime_inner.clone();
                let thread = thread::Builder::new()
                    .name(format!("thread-{}", thread_index))
                    .spawn(move || {
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
                        scheduler.run();
                    })
                    .unwrap();

                schedulers.push(thread);
            }

            let mut lock = runtime_inner.schedulers.lock().unwrap();
            *lock = schedulers;
        }

        // Return runtime
        return Runtime{ inner: runtime_inner };
    }

    /// Returns a new interface through which channels and connectors can be
    /// created.
    pub fn create_interface(&self) -> ApplicationInterface {
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
        interface.set_connector_id(connector_key.downcast());

        // Note that we're not scheduling. That is done by the interface in case
        // it is actually needed.
        return interface;
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.global_store.should_exit.store(true, Ordering::Release);
        let mut schedulers = self.inner.schedulers.lock().unwrap();
        for scheduler in schedulers.drain(..) {
            scheduler.join().unwrap();
        }
    }
}