Files @ daf15df0f8ca
Branch filter:

Location: CSY/reowolf/src/runtime2/mod.rs

daf15df0f8ca 3.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
scaffolding in place for scheduler/runtime
// Structure of module

mod runtime;
mod messages;
mod connector;
mod port;
mod global_store;
mod scheduler;
mod inbox;

#[cfg(test)] mod tests;

// Imports

use std::sync::Arc;
use std::thread::{self, JoinHandle};

use crate::protocol::eval::*;
use crate::{common::Id, PortId, ProtocolDescription};

use global_store::GlobalStore;
use scheduler::Scheduler;
use crate::protocol::ComponentCreationError;
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};


// Runtime API
pub struct Runtime {
    global_store: Arc<GlobalStore>,
    protocol_description: Arc<ProtocolDescription>,
    schedulers: Vec<JoinHandle<()>>
}

impl Runtime {
    pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
        // Setup global state
        assert!(num_threads > 0, "need a thread to run connectors");
        let global_store = Arc::new(GlobalStore::new());

        // Launch threads
        let mut schedulers = Vec::with_capacity(num_threads);
        for _ in 0..num_threads {
            let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
            let thread = thread::spawn(move || {
                scheduler.run();
            });

            schedulers.push(thread);
        }

        // Move innards into runtime struct
        return Runtime{
            global_store,
            protocol_description,
            schedulers,
        }
    }

    /// Returns (putter port, getter port)
    pub fn create_channel(&self) -> (Value, Value) {
        let channel = self.global_store.ports.create_channel(None);
        let putter_value = Value::Output(PortId(Id{
            connector_id: u32::MAX,
            u32_suffix: channel.putter_id,
        }));
        let getter_value = Value::Input(PortId(Id{
            connector_id: u32::MAX,
            u32_suffix: channel.getter_id,
        }));
        return (putter_value, getter_value);
    }

    pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
        // TODO: Remove component creation function from PD, should not be concerned with it
        // Create the connector and mark the ports as now owned by the
        // connector
        let mut port_ids = Vec::new();
        find_ports_in_value_group(&values, &mut port_ids);

        let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
        let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
        let connector_key = self.global_store.connectors.create(connector);

        for port_id in port_ids {
            let port = self.global_store.ports.get(&connector_key, port_id);
            port.owning_connector = connector_key.downcast();
            port.peer_connector
            // TODO: Note that we immediately need to notify the other side of the connector that
            //  the port has moved!
        }
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {

    }
}