diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs new file mode 100644 index 0000000000000000000000000000000000000000..82b5de08d295ab4ffa43ff00c86d67787eb17d87 --- /dev/null +++ b/src/runtime2/native.rs @@ -0,0 +1,165 @@ +use std::sync::{Arc, Mutex, Condvar}; +use std::cell::Cell; +use std::sync::atomic::Ordering; +use crate::protocol::ComponentCreationError; + +use crate::protocol::eval::ValueGroup; +use crate::ProtocolDescription; +use crate::runtime2::connector::{Branch, find_ports_in_value_group}; +use crate::runtime2::global_store::ConnectorKey; + +use super::RuntimeInner; +use super::global_store::{ConnectorVariant, ConnectorId}; +use super::port::{Channel, PortIdLocal}; +use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; +use super::inbox::{Message, DataMessage, SyncMessage}; + +pub trait Connector { + fn insert_data_message(&mut self, message: DataMessage); + fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState); + fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling; +} + +type SyncDone = Arc<(Mutex, Condvar)>; +type JobQueue = Arc>>, + +enum ApplicationJob { + NewConnector(ConnectorPDL), +} + +/// The connector which an application can directly interface with. Once may set +/// up the next synchronous round, and retrieve the data afterwards. +pub struct ConnectorApplication { + sync_done: SyncDone, + job_queue: JobQueue, +} + +impl ConnectorApplication { + pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { + let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); + let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32))); + + let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; + let interface = ApplicationInterface::new(sync_done, job_queue, runtime); + + return (connector, interface); + } +} + +impl Connector for ConnectorApplication { + fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) { + todo!("handling sync messages in ApplicationConnector"); + } + + fn insert_data_message(&mut self, message: DataMessage) { + todo!("handling messages in ApplicationConnector"); + } + + fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop() { + match job { + ApplicationJob::NewConnector(connector) => { + delta_state.new_connectors.push(connector); + } + } + } + + return ConnectorScheduling::NotNow; + } +} + +/// The interface to a `ApplicationConnector`. This allows setting up the +/// interactions the `ApplicationConnector` performs within a synchronous round. +pub struct ApplicationInterface { + sync_done: SyncDone, + job_queue: JobQueue, + runtime: Arc, + connector_id: ConnectorId, + owned_ports: Vec, +} + +impl ApplicationInterface { + pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + return Self{ + sync_done, job_queue, runtime, + connector_id: ConnectorId::new_invalid(), + owned_ports: Vec::new(), + } + } + + /// Creates a new channel. + pub fn create_channel(&mut self) -> Channel { + let channel = self.runtime.global_store.ports.create_channel(self.connector_id); + self.owned_ports.push(channel.putter_id); + self.owned_ports.push(channel.getter_id); + + return channel; + } + + /// Creates a new connector. Note that it is not scheduled immediately, but + /// depends on the `ApplicationConnector` to run, followed by the created + /// connector being scheduled. + // TODO: Optimize by yanking out scheduler logic for common use. + pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { + // Retrieve ports and make sure that we own the ones that are currently + // specified. This is also checked by the scheduler, but that is done + // asynchronously. + let mut initial_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut initial_ports); + for port_to_remove in &initial_ports { + match self.owned_ports.iter().position(|v| v == port_to_remove) { + Some(index_to_remove) => { + // We own the port, so continue + self.owned_ports.remove(index_to_remove) + }, + None => { + // We don't own the port + return Err(ComponentCreationError::UnownedPort); + } + } + } + + let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; + let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports); + + // Put on job queue + { + let mut queue = self.job_queue.lock().unwrap(); + queue.push(ApplicationJob::NewConnector(connector)); + } + + // Send ping message to wake up connector + let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); + connector.inbox.insert_message(Message::Ping); + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; + self.runtime.global_store.connector_queue.push_back(key); + } + + return Ok(()); + } + + /// Check if the next sync-round is finished. + pub fn try_wait(&self) -> bool { + let (is_done, _) = &*self.sync_done; + let lock = is_done.lock().unwrap(); + return *lock; + } + + /// Wait until the next sync-round is finished + pub fn wait(&self) { + let (is_done, condition) = &*self.sync_done; + let lock = is_done.lock().unwrap(); + condition.wait_while(lock, |v| !*v); // wait while not done + } + + /// Called by runtime to set associated connector's ID. + pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { + self.connector_id = id; + } +} \ No newline at end of file