use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; use super::scheduler::SchedulerCtx; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; use super::connector::find_ports_in_value_group; use super::inbox::{Message, MessageContents}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { /// Handle a new message (preprocessed by the scheduler). You probably only /// want to handle `Data`, `Sync`, and `Solution` messages. The others are /// intended for the scheduler itself. fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL), Shutdown, } /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. pub struct ConnectorApplication { sync_done: SyncDone, job_queue: JobQueue, } impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); } } impl Connector for ConnectorApplication { fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { use MessageContents as MC; match message.contents { MC::Data(_) => unreachable!("data message in API connector"), MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => { // Handling sync in API }, MC::Control(_) => {}, MC::Ping => {}, } } fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { println!("DEBUG: API adopting ports"); delta_state.new_ports.reserve(2); delta_state.new_ports.push(endpoint_a); delta_state.new_ports.push(endpoint_b); } ApplicationJob::NewConnector(connector) => { println!("DEBUG: API creating connector"); delta_state.new_connectors.push(connector); }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); return ConnectorScheduling::Exit; } } } return ConnectorScheduling::NotNow; } } /// The interface to a `ApplicationConnector`. This allows setting up the /// interactions the `ApplicationConnector` performs within a synchronous round. pub struct ApplicationInterface { sync_done: SyncDone, job_queue: JobQueue, runtime: Arc, connector_id: ConnectorId, owned_ports: Vec, } impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), owned_ports: Vec::new(), } } /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); debug_assert_eq!(getter_port.kind, PortKind::Getter); let getter_id = getter_port.self_id; let putter_id = putter_port.self_id; { let mut lock = self.job_queue.lock().unwrap(); lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port))); } // Add to owned ports for error checking while creating a connector self.owned_ports.reserve(2); self.owned_ports.push(putter_id); self.owned_ports.push(getter_id); return Channel{ putter_id, getter_id }; } /// Creates a new connector. Note that it is not scheduled immediately, but /// depends on the `ApplicationConnector` to run, followed by the created /// connector being scheduled. // TODO: Yank out scheduler logic for common use. pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { // Retrieve ports and make sure that we own the ones that are currently // specified. This is also checked by the scheduler, but that is done // asynchronously. let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); for port_to_remove in &initial_ports { match self.owned_ports.iter().position(|v| v == port_to_remove) { Some(index_to_remove) => { // We own the port, so continue self.owned_ports.remove(index_to_remove); }, None => { // We don't own the port return Err(ComponentCreationError::UnownedPort); } } } let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports); // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); queue.push_back(ApplicationJob::NewConnector(connector)); } self.wake_up_connector_with_ping(); return Ok(()); } /// Check if the next sync-round is finished. pub fn try_wait(&self) -> bool { let (is_done, _) = &*self.sync_done; let lock = is_done.lock().unwrap(); return *lock; } /// Wait until the next sync-round is finished pub fn wait(&self) { let (is_done, condition) = &*self.sync_done; let lock = is_done.lock().unwrap(); condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done } /// Called by runtime to set associated connector's ID. pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } fn wake_up_connector_with_ping(&self) { let connector = self.runtime.get_component_public(self.connector_id); connector.inbox.insert_message(Message{ sending_connector: ConnectorId::new_invalid(), receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Ping, }); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { println!("DEBUG: Waking up connector"); let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; self.runtime.push_work(key); } else { println!("DEBUG: NOT waking up connector"); } } } impl Drop for ApplicationInterface { fn drop(&mut self) { { let mut lock = self.job_queue.lock().unwrap(); lock.push_back(ApplicationJob::Shutdown); } self.wake_up_connector_with_ping(); self.runtime.decrement_active_interfaces(); } }