diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 39885d6565633073652f17f267168f76323ea3a6..0e877c36449aab3519e0a4aba53bf24894f62b0f 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -5,17 +5,12 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; -use crate::runtime2::connector::{Branch, find_ports_in_value_group}; -use crate::runtime2::global_store::ConnectorKey; -use crate::runtime2::inbox::MessageContents; -use crate::runtime2::port::{Port, PortKind}; -use crate::runtime2::scheduler::ConnectorCtx; - -use super::RuntimeInner; -use super::global_store::ConnectorId; -use super::port::{Channel, PortIdLocal}; -use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; -use super::inbox::Message; + +use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; +use super::port::{Port, PortIdLocal, Channel, PortKind}; +use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; +use super::connector::find_ports_in_value_group; +use super::inbox::{Message, MessageContents}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -34,6 +29,7 @@ type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL), + Shutdown, } /// The connector which an application can directly interface with. Once may set @@ -69,7 +65,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _protocol_description: &ProtocolDescription, _ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { @@ -82,6 +78,10 @@ impl Connector for ConnectorApplication { ApplicationJob::NewConnector(connector) => { println!("DEBUG: API creating connector"); delta_state.new_connectors.push(connector); + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; } } } @@ -102,8 +102,6 @@ pub struct ApplicationInterface { impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { - runtime.active_interfaces += 1; - return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -114,7 +112,7 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { // TODO: Duplicated logic in scheduler - let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst); + let getter_id = self.runtime.port_counter.fetch_add(2, Ordering::SeqCst); let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); @@ -179,25 +177,7 @@ impl ApplicationInterface { queue.push_back(ApplicationJob::NewConnector(connector)); } - // Send ping message to wake up connector - let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); - connector.inbox.insert_message(Message{ - sending_connector: ConnectorId::new_invalid(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Ping, - }); - - let should_wake_up = connector.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - println!("DEBUG: Waking up connector"); - let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; - self.runtime.global_store.connector_queue.push_back(key); - } else { - println!("DEBUG: NOT waking up connector"); - } + self.wake_up_connector_with_ping(); return Ok(()); } @@ -220,10 +200,37 @@ impl ApplicationInterface { pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } + + fn wake_up_connector_with_ping(&self) { + let connector = self.runtime.get_component_public(self.connector_id); + connector.inbox.insert_message(Message{ + sending_connector: ConnectorId::new_invalid(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Ping, + }); + + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + println!("DEBUG: Waking up connector"); + let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; + self.runtime.push_work(key); + } else { + println!("DEBUG: NOT waking up connector"); + } + } } impl Drop for ApplicationInterface { fn drop(&mut self) { + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::Shutdown); + } + self.wake_up_connector_with_ping(); + self.runtime.decrement_active_interfaces(); } } \ No newline at end of file