diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 04285976db5961cf363f5ff5ca65a4907151da43..a436d529b9cbca6154208bfe01de9df3dcbd857b 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,21 +1,20 @@ use std::sync::{Arc, Mutex, Condvar}; -use std::cell::Cell; use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; -use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::global_store::ConnectorKey; use crate::runtime2::inbox::MessageContents; use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; use super::RuntimeInner; -use super::global_store::{ConnectorVariant, ConnectorId}; +use super::global_store::ConnectorId; use super::port::{Channel, PortIdLocal}; use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; -use super::inbox::{Message, DataMessage, SyncMessage}; +use super::inbox::Message; /// Generic connector interface from the scheduler's point of view. pub trait Connector { @@ -32,6 +31,7 @@ type SyncDone = Arc<(Mutex, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { + NewChannel((Port, Port)), NewConnector(ConnectorPDL), } @@ -63,6 +63,11 @@ impl Connector for ConnectorApplication { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop() { match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + delta_state.new_ports.reserve(2); + delta_state.new_ports.push(endpoint_a); + delta_state.new_ports.push(endpoint_b); + } ApplicationJob::NewConnector(connector) => { delta_state.new_connectors.push(connector); } @@ -80,11 +85,11 @@ pub struct ApplicationInterface { job_queue: JobQueue, runtime: Arc, connector_id: ConnectorId, - owned_ports: Vec, + owned_ports: Vec, } impl ApplicationInterface { - pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -99,19 +104,31 @@ impl ApplicationInterface { let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); - self.owned_ports.push(Port{ + // Create ports and add a job such that they are transferred to the + // API component. (note that we do not send a ping, this is only + // necessary once we create a connector) + let getter_port = Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, peer_connector: self.connector_id, - }); - - self.owned_ports.push(Port{ + }; + let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, peer_connector: self.connector_id, - }); + }; + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push(ApplicationJob::NewChannel((getter_port, putter_port))); + } + + // Add to owned ports for error checking while creating a connector + self.owned_ports.reserve(2); + self.owned_ports.push(putter_id); + self.owned_ports.push(getter_id); return Channel{ putter_id, getter_id }; } @@ -130,7 +147,7 @@ impl ApplicationInterface { match self.owned_ports.iter().position(|v| v == port_to_remove) { Some(index_to_remove) => { // We own the port, so continue - self.owned_ports.remove(index_to_remove) + self.owned_ports.remove(index_to_remove); }, None => { // We don't own the port @@ -150,7 +167,12 @@ impl ApplicationInterface { // Send ping message to wake up connector let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); - connector.inbox.insert_message(Message::Ping); + connector.inbox.insert_message(Message{ + sending_connector: ConnectorId::new_invalid(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Ping, + }); + let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok();