diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index a436d529b9cbca6154208bfe01de9df3dcbd857b..39885d6565633073652f17f267168f76323ea3a6 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,7 +1,8 @@ +use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; -use crate::protocol::ComponentCreationError; +use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; @@ -17,18 +18,18 @@ use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; use super::inbox::Message; /// Generic connector interface from the scheduler's point of view. -pub trait Connector { +pub(crate) trait Connector { /// Handle a new message (preprocessed by the scheduler). You probably only /// want to handle `Data`, `Sync`, and `Solution` messages. The others are /// intended for the scheduler itself. - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; -type JobQueue = Arc>>; +type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), @@ -45,7 +46,7 @@ pub struct ConnectorApplication { impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); - let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32))); + let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -55,20 +56,31 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { - todo!("handling messages in ConnectorApplication (API for runtime)") + fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { + use MessageContents as MC; + + match message.contents { + MC::Data(_) => unreachable!("data message in API connector"), + MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => { + // Handling sync in API + }, + MC::Control(_) => {}, + MC::Ping => {}, + } } fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop() { + while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + println!("DEBUG: API adopting ports"); delta_state.new_ports.reserve(2); delta_state.new_ports.push(endpoint_a); delta_state.new_ports.push(endpoint_b); } ApplicationJob::NewConnector(connector) => { + println!("DEBUG: API creating connector"); delta_state.new_connectors.push(connector); } } @@ -89,7 +101,9 @@ pub struct ApplicationInterface { } impl ApplicationInterface { - pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + runtime.active_interfaces += 1; + return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -122,7 +136,7 @@ impl ApplicationInterface { { let mut lock = self.job_queue.lock().unwrap(); - lock.push(ApplicationJob::NewChannel((getter_port, putter_port))); + lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port))); } // Add to owned ports for error checking while creating a connector @@ -162,7 +176,7 @@ impl ApplicationInterface { // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); - queue.push(ApplicationJob::NewConnector(connector)); + queue.push_back(ApplicationJob::NewConnector(connector)); } // Send ping message to wake up connector @@ -178,8 +192,11 @@ impl ApplicationInterface { .is_ok(); if should_wake_up { + println!("DEBUG: Waking up connector"); let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; self.runtime.global_store.connector_queue.push_back(key); + } else { + println!("DEBUG: NOT waking up connector"); } return Ok(()); @@ -196,11 +213,17 @@ impl ApplicationInterface { pub fn wait(&self) { let (is_done, condition) = &*self.sync_done; let lock = is_done.lock().unwrap(); - condition.wait_while(lock, |v| !*v); // wait while not done + condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done } /// Called by runtime to set associated connector's ID. pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } +} + +impl Drop for ApplicationInterface { + fn drop(&mut self) { + + } } \ No newline at end of file