diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 0e877c36449aab3519e0a4aba53bf24894f62b0f..5eab60d0def9ecf32d6275a1a1c52568d19bf5cf 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -7,6 +7,7 @@ use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; +use super::scheduler::SchedulerCtx; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; use super::connector::find_ports_in_value_group; @@ -20,7 +21,7 @@ pub(crate) trait Connector { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -65,7 +66,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, _protocol_description: &ProtocolDescription, _ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _sched_ctx: &SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { @@ -111,26 +112,10 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { - // TODO: Duplicated logic in scheduler - let getter_id = self.runtime.port_counter.fetch_add(2, Ordering::SeqCst); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - // Create ports and add a job such that they are transferred to the - // API component. (note that we do not send a ping, this is only - // necessary once we create a connector) - let getter_port = Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - peer_connector: self.connector_id, - }; - let putter_port = Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - peer_connector: self.connector_id, - }; + let (getter_port, putter_port) = self.runtime.create_channel(); + debug_assert_eq!(getter_port.kind, PortKind::Getter); + let getter_id = getter_port.self_id; + let putter_id = putter_port.self_id; { let mut lock = self.job_queue.lock().unwrap();