Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/mod.rs

58dfabd1be9f 3.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
moving to laptop
// Structure of module

mod runtime;
mod messages;
mod connector;
mod native;
mod port;
mod global_store;
mod scheduler;
mod inbox;

#[cfg(test)] mod tests;

// Imports

use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
use std::thread::{self, JoinHandle};

use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};

use global_store::{ConnectorVariant, GlobalStore};
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
use native::{ConnectorApplication, ApplicationInterface};


// Runtime API
// TODO: Exit condition is very dirty. Take into account:
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
pub struct Runtime {
    inner: Arc<RuntimeInner>,
}

pub(crate) struct RuntimeInner {
    pub(crate) global_store: GlobalStore,
    pub(crate) protocol_description: ProtocolDescription,
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
}

impl Runtime {
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
        // Setup global state
        assert!(num_threads > 0, "need a thread to run connectors");
        let runtime_inner = Arc::new(RuntimeInner{
            global_store: GlobalStore::new(),
            protocol_description,
            schedulers: Mutex::new(Vec::new()),
        });

        // Launch threads
        {
            let mut schedulers = Vec::with_capacity(num_threads);
            for _ in 0..num_threads {
                let mut scheduler = Scheduler::new(runtime_inner.clone());
                let thread = thread::spawn(move || {
                    scheduler.run();
                });

                schedulers.push(thread);
            }

            let mut lock = runtime_inner.schedulers.lock().unwrap();
            *lock = schedulers;
        }

        // Return runtime
        return Runtime{ inner: runtime_inner };
    }

    /// Returns a new interface through which channels and connectors can be
    /// created.
    pub fn create_interface(&self) -> ApplicationInterface {
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
        let connector = Box::new(connector);

        let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
        interface.set_connector_id(connector_key.downcast());

        // Note that we're not scheduling. That is done by the interface in case
        // it is actually needed.
        return interface;
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.global_store.should_exit.store(true, Ordering::Release);
        let mut schedulers = self.inner.schedulers.lock().unwrap();
        for scheduler in schedulers.drain(..) {
            scheduler.join();
        }
    }
}