Files @ 68411f4b8014
Branch filter:

Location: CSY/reowolf/src/runtime2/native.rs

68411f4b8014 8.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Round of cleanup on temporary type names and old code
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::Ordering;

use crate::protocol::ComponentCreationError;
use crate::protocol::eval::ValueGroup;

use super::{ConnectorKey, ConnectorId, RuntimeInner};
use super::scheduler::{SchedulerCtx, ComponentCtx};
use super::port::{Port, PortIdLocal, Channel, PortKind};
use super::consensus::find_ports_in_value_group;
use super::connector::{ConnectorScheduling, ConnectorPDL};
use super::inbox::{Message, ControlContent, ControlMessage};

/// Generic connector interface from the scheduler's point of view.
pub(crate) trait Connector {
    /// Should run the connector's behaviour up until the next blocking point.
    /// One should generally request and handle new messages from the component
    /// context. Then perform any logic the component has to do, and in the
    /// process perhaps queue up some state changes using the same context.
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
}

type SyncDone = Arc<(Mutex<bool>, Condvar)>;
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;

enum ApplicationJob {
    NewChannel((Port, Port)),
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
    Shutdown,
}

/// The connector which an application can directly interface with. Once may set
/// up the next synchronous round, and retrieve the data afterwards.
pub struct ConnectorApplication {
    sync_done: SyncDone,
    job_queue: JobQueue,
}

impl ConnectorApplication {
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));

        let connector = ConnectorApplication {
            sync_done: sync_done.clone(),
            job_queue: job_queue.clone()
        };
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);

        return (connector, interface);
    }
}

impl Connector for ConnectorApplication {
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        // Handle any incoming messages if we're participating in a round
        while let Some(message) = comp_ctx.read_next_message() {
            match message {
                Message::Data(_) => todo!("data message in API connector"),
                Message::Sync(_)  => todo!("sync message in API connector"),
                Message::Control(_) => todo!("impossible control message"),
            }
        }

        // Handle requests coming from the API
        {
            let mut queue = self.job_queue.lock().unwrap();
            while let Some(job) = queue.pop_front() {
                match job {
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
                        println!("DEBUG: API adopting ports");
                        comp_ctx.push_port(endpoint_a);
                        comp_ctx.push_port(endpoint_b);
                    }
                    ApplicationJob::NewConnector(connector, initial_ports) => {
                        println!("DEBUG: API creating connector");
                        comp_ctx.push_component(connector, initial_ports);
                    },
                    ApplicationJob::Shutdown => {
                        debug_assert!(queue.is_empty());
                        return ConnectorScheduling::Exit;
                    }
                }
            }
        }

        return ConnectorScheduling::NotNow;
    }
}

/// The interface to a `ApplicationConnector`. This allows setting up the
/// interactions the `ApplicationConnector` performs within a synchronous round.
pub struct ApplicationInterface {
    sync_done: SyncDone,
    job_queue: JobQueue,
    runtime: Arc<RuntimeInner>,
    connector_id: ConnectorId,
    owned_ports: Vec<PortIdLocal>,
}

impl ApplicationInterface {
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
        return Self{
            sync_done, job_queue, runtime,
            connector_id: ConnectorId::new_invalid(),
            owned_ports: Vec::new(),
        }
    }

    /// Creates a new channel.
    pub fn create_channel(&mut self) -> Channel {
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
        let getter_id = getter_port.self_id;
        let putter_id = putter_port.self_id;

        {
            let mut lock = self.job_queue.lock().unwrap();
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
        }

        // Add to owned ports for error checking while creating a connector
        self.owned_ports.reserve(2);
        self.owned_ports.push(putter_id);
        self.owned_ports.push(getter_id);

        return Channel{ putter_id, getter_id };
    }

    /// Creates a new connector. Note that it is not scheduled immediately, but
    /// depends on the `ApplicationConnector` to run, followed by the created
    /// connector being scheduled.
    // TODO: Yank out scheduler logic for common use.
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
        // Retrieve ports and make sure that we own the ones that are currently
        // specified. This is also checked by the scheduler, but that is done
        // asynchronously.
        let mut initial_ports = Vec::new();
        find_ports_in_value_group(&arguments, &mut initial_ports);
        for initial_port in &initial_ports {
            if !self.owned_ports.iter().any(|v| v == initial_port) {
                return Err(ComponentCreationError::UnownedPort);
            }
        }

        // We own all ports, so remove them on this side
        for initial_port in &initial_ports {
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
            self.owned_ports.remove(position);
        }

        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
        let connector = ConnectorPDL::new(state);

        // Put on job queue
        {
            let mut queue = self.job_queue.lock().unwrap();
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
        }

        self.wake_up_connector_with_ping();

        return Ok(());
    }

    /// Check if the next sync-round is finished.
    pub fn try_wait(&self) -> bool {
        let (is_done, _) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        return *lock;
    }

    /// Wait until the next sync-round is finished
    pub fn wait(&self) {
        let (is_done, condition) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
    }

    /// Called by runtime to set associated connector's ID.
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
        self.connector_id = id;
    }

    fn wake_up_connector_with_ping(&self) {
        let connector = self.runtime.get_component_public(self.connector_id);
        connector.inbox.insert_message(Message::Control(ControlMessage {
            id: 0,
            sending_component_id: self.connector_id,
            content: ControlContent::Ping,
        }));

        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            println!("DEBUG: Waking up connector");
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
            self.runtime.push_work(key);
        } else {
            println!("DEBUG: NOT waking up connector");
        }
    }
}

impl Drop for ApplicationInterface {
    fn drop(&mut self) {
        {
            let mut lock = self.job_queue.lock().unwrap();
            lock.push_back(ApplicationJob::Shutdown);
        }

        self.wake_up_connector_with_ping();
        self.runtime.decrement_active_interfaces();
    }
}