Files @ 0dea927c969a
Branch filter:

Location: CSY/reowolf/src/runtime2/native.rs - annotation

0dea927c969a 7.8 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Hans-Dieter Hiep
Update .gitlab-ci.yml file
7d01f1245b7c
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
1755ca411ca7
154e5e08b93a
68411f4b8014
1755ca411ca7
edb4c4be7e45
68411f4b8014
68411f4b8014
58dfabd1be9f
b4ac681e0e7f
7d01f1245b7c
b4ac681e0e7f
154e5e08b93a
154e5e08b93a
154e5e08b93a
68411f4b8014
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
58dfabd1be9f
a43d61913724
edb4c4be7e45
1755ca411ca7
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
68411f4b8014
154e5e08b93a
154e5e08b93a
154e5e08b93a
68411f4b8014
68411f4b8014
68411f4b8014
154e5e08b93a
7d01f1245b7c
58dfabd1be9f
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
edb4c4be7e45
edb4c4be7e45
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
154e5e08b93a
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
a43d61913724
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
98aadfccbafd
ce39b1540ff5
ce39b1540ff5
ce39b1540ff5
a43d61913724
a43d61913724
a43d61913724
7d01f1245b7c
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
26d47db4f922
26d47db4f922
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
ceaa946df1eb
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
edb4c4be7e45
edb4c4be7e45
328f04b6612f
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
58dfabd1be9f
edb4c4be7e45
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
edb4c4be7e45
58dfabd1be9f
58dfabd1be9f
1755ca411ca7
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
7d01f1245b7c
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
1755ca411ca7
1755ca411ca7
1755ca411ca7
68411f4b8014
edb4c4be7e45
edb4c4be7e45
32d91577e090
edb4c4be7e45
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
1755ca411ca7
1755ca411ca7
1755ca411ca7
1755ca411ca7
7d01f1245b7c
1755ca411ca7
1755ca411ca7
7d01f1245b7c
58dfabd1be9f
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::Ordering;

use crate::protocol::ComponentCreationError;
use crate::protocol::eval::ValueGroup;

use super::{ConnectorKey, ConnectorId, RuntimeInner};
use super::scheduler::{SchedulerCtx, ComponentCtx};
use super::port::{Port, PortIdLocal, Channel, PortKind};
use super::consensus::find_ports_in_value_group;
use super::connector::{ConnectorScheduling, ConnectorPDL};
use super::inbox::{Message, ControlContent, ControlMessage};

/// Generic connector interface from the scheduler's point of view.
pub(crate) trait Connector {
    /// Should run the connector's behaviour up until the next blocking point.
    /// One should generally request and handle new messages from the component
    /// context. Then perform any logic the component has to do, and in the
    /// process perhaps queue up some state changes using the same context.
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
}

type SyncDone = Arc<(Mutex<bool>, Condvar)>;
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;

enum ApplicationJob {
    NewChannel((Port, Port)),
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
    Shutdown,
}

/// The connector which an application can directly interface with. Once may set
/// up the next synchronous round, and retrieve the data afterwards.
pub struct ConnectorApplication {
    sync_done: SyncDone,
    job_queue: JobQueue,
}

impl ConnectorApplication {
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));

        let connector = ConnectorApplication {
            sync_done: sync_done.clone(),
            job_queue: job_queue.clone()
        };
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);

        return (connector, interface);
    }
}

impl Connector for ConnectorApplication {
    fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        // Handle any incoming messages if we're participating in a round
        while let Some(message) = comp_ctx.read_next_message() {
            match message {
                Message::Data(_) => todo!("data message in API connector"),
                Message::Sync(_)  => todo!("sync message in API connector"),
                Message::Control(_) => todo!("impossible control message"),
            }
        }

        // Handle requests coming from the API
        {
            let mut queue = self.job_queue.lock().unwrap();
            while let Some(job) = queue.pop_front() {
                match job {
                    ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
                        comp_ctx.push_port(endpoint_a);
                        comp_ctx.push_port(endpoint_b);
                    }
                    ApplicationJob::NewConnector(connector, initial_ports) => {
                        comp_ctx.push_component(connector, initial_ports);
                    },
                    ApplicationJob::Shutdown => {
                        debug_assert!(queue.is_empty());
                        return ConnectorScheduling::Exit;
                    }
                }
            }
        }

        return ConnectorScheduling::NotNow;
    }
}

/// The interface to a `ApplicationConnector`. This allows setting up the
/// interactions the `ApplicationConnector` performs within a synchronous round.
pub struct ApplicationInterface {
    sync_done: SyncDone,
    job_queue: JobQueue,
    runtime: Arc<RuntimeInner>,
    connector_id: ConnectorId,
    owned_ports: Vec<PortIdLocal>,
}

impl ApplicationInterface {
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
        return Self{
            sync_done, job_queue, runtime,
            connector_id: ConnectorId::new_invalid(),
            owned_ports: Vec::new(),
        }
    }

    /// Creates a new channel.
    pub fn create_channel(&mut self) -> Channel {
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
        let getter_id = getter_port.self_id;
        let putter_id = putter_port.self_id;

        {
            let mut lock = self.job_queue.lock().unwrap();
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
        }

        // Add to owned ports for error checking while creating a connector
        self.owned_ports.reserve(2);
        self.owned_ports.push(putter_id);
        self.owned_ports.push(getter_id);

        return Channel{ putter_id, getter_id };
    }

    /// Creates a new connector. Note that it is not scheduled immediately, but
    /// depends on the `ApplicationConnector` to run, followed by the created
    /// connector being scheduled.
    // TODO: Yank out scheduler logic for common use.
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
        // Retrieve ports and make sure that we own the ones that are currently
        // specified. This is also checked by the scheduler, but that is done
        // asynchronously.
        let mut initial_ports = Vec::new();
        find_ports_in_value_group(&arguments, &mut initial_ports);
        for initial_port in &initial_ports {
            if !self.owned_ports.iter().any(|v| v == initial_port) {
                return Err(ComponentCreationError::UnownedPort);
            }
        }

        // We own all ports, so remove them on this side
        for initial_port in &initial_ports {
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
            self.owned_ports.remove(position);
        }

        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
        let connector = ConnectorPDL::new(state);

        // Put on job queue
        {
            let mut queue = self.job_queue.lock().unwrap();
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
        }

        self.wake_up_connector_with_ping();

        return Ok(());
    }

    /// Check if the next sync-round is finished.
    pub fn try_wait(&self) -> bool {
        let (is_done, _) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        return *lock;
    }

    /// Wait until the next sync-round is finished
    pub fn wait(&self) {
        let (is_done, condition) = &*self.sync_done;
        let lock = is_done.lock().unwrap();
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
    }

    /// Called by runtime to set associated connector's ID.
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
        self.connector_id = id;
    }

    fn wake_up_connector_with_ping(&self) {
        let connector = self.runtime.get_component_public(self.connector_id);
        connector.inbox.insert_message(Message::Control(ControlMessage {
            id: 0,
            sending_component_id: self.connector_id,
            content: ControlContent::Ping,
        }));

        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
            self.runtime.push_work(key);
        }
    }
}

impl Drop for ApplicationInterface {
    fn drop(&mut self) {
        {
            let mut lock = self.job_queue.lock().unwrap();
            lock.push_back(ApplicationJob::Shutdown);
        }

        self.wake_up_connector_with_ping();
        self.runtime.decrement_active_interfaces();
    }
}