Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/native.rs - annotation

58dfabd1be9f 6.1 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
moving to laptop
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
use std::sync::{Arc, Mutex, Condvar};
use std::cell::Cell;
use std::sync::atomic::Ordering;
use crate::protocol::ComponentCreationError;

use crate::protocol::eval::ValueGroup;
use crate::ProtocolDescription;
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
use crate::runtime2::global_store::ConnectorKey;

use super::RuntimeInner;
use super::global_store::{ConnectorVariant, ConnectorId};
use super::port::{Channel, PortIdLocal};
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
use super::inbox::{Message, DataMessage, SyncMessage};

pub trait Connector {
    fn insert_data_message(&mut self, message: DataMessage);
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
}

type SyncDone = Arc<(Mutex<bool>, Condvar)>;
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,

enum ApplicationJob {
    NewConnector(ConnectorPDL),
}

/// The connector which an application can directly interface with. Once may set
/// up the next synchronous round, and retrieve the data afterwards.
pub struct ConnectorApplication {
    sync_done: SyncDone,
    job_queue: JobQueue,
}

impl ConnectorApplication {
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));

        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);

        return (connector, interface);
    }
}

impl Connector for ConnectorApplication {
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
        todo!("handling sync messages in ApplicationConnector");
    }

    fn insert_data_message(&mut self, message: DataMessage)  {
        todo!("handling messages in ApplicationConnector");
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        let mut queue = self.job_queue.lock().unwrap();
        while let Some(job) = queue.pop() {
            match job {
                ApplicationJob::NewConnector(connector) => {
                    delta_state.new_connectors.push(connector);
                }
            }
        }

        return ConnectorScheduling::NotNow;
    }
}

/// The interface to a `ApplicationConnector`. This allows setting up the
/// interactions the `ApplicationConnector` performs within a synchronous round.
pub struct ApplicationInterface {
    sync_done: SyncDone,
    job_queue: JobQueue,
    runtime: Arc<RuntimeInner>,
    connector_id: ConnectorId,
    owned_ports: Vec<PortIdLocal>,
}

impl ApplicationInterface {
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
        return Self{
            sync_done, job_queue, runtime,
            connector_id: ConnectorId::new_invalid(),
            owned_ports: Vec::new(),
        }
    }

    /// Creates a new channel.
    pub fn create_channel(&mut self) -> Channel {
        let channel = self.runtime.global_store.ports.create_channel(self.connector_id);
        self.owned_ports.push(channel.putter_id);
        self.owned_ports.push(channel.getter_id);

        return channel;
    }

    /// Creates a new connector. Note that it is not scheduled immediately, but
    /// depends on the `ApplicationConnector` to run, followed by the created
    /// connector being scheduled.
    // TODO: Optimize by yanking out scheduler logic for common use.
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
        // Retrieve ports and make sure that we own the ones that are currently
        // specified. This is also checked by the scheduler, but that is done
        // asynchronously.
        let mut initial_ports = Vec::new();
        find_ports_in_value_group(&arguments, &mut initial_ports);
        for port_to_remove in &initial_ports {
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
                Some(index_to_remove) => {
                    // We own the port, so continue
                    self.owned_ports.remove(index_to_remove)
                },
                None => {
                    // We don't own the port
                    return Err(ComponentCreationError::UnownedPort);
                }
            }
        }

        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
        let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports);

        // Put on job queue
        {
            let mut queue = self.job_queue.lock().unwrap();
            queue.push(ApplicationJob::NewConnector(connector));
        }

        // Send ping message to wake up connector
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
        connector.inbox.insert_message(Message::Ping);
        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
            self.runtime.global_store.connector_queue.push_back(key);
        }

        return Ok(());
    }

    /// Check if the next sync-round is finished.
    pub fn try_wait(&self) -> bool {
        let (is_done, _) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        return *lock;
    }

    /// Wait until the next sync-round is finished
    pub fn wait(&self) {
        let (is_done, condition) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        condition.wait_while(lock, |v| !*v); // wait while not done
    }

    /// Called by runtime to set associated connector's ID.
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
        self.connector_id = id;
    }
}