diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 61c761bcc1fb52194a9abfe63be452b6aee1837a..b0de3c54ae700703f8812d335c2397c0075f3269 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -7,6 +7,9 @@ use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::inbox::MessageContents; +use crate::runtime2::port::{Port, PortKind}; +use crate::runtime2::scheduler::ConnectorCtx; use super::RuntimeInner; use super::global_store::{ConnectorVariant, ConnectorId}; @@ -14,14 +17,19 @@ use super::port::{Channel, PortIdLocal}; use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; use super::inbox::{Message, DataMessage, SyncMessage}; +/// Generic connector interface from the scheduler's point of view. pub trait Connector { - fn insert_data_message(&mut self, message: DataMessage); - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState); - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + /// Handle a new message (preprocessed by the scheduler). You probably only + /// want to handle `Data`, `Sync`, and `Solution` messages. The others are + /// intended for the scheduler itself. + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); + + /// Should run the connector's behaviour up until the next blocking point. + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; -type JobQueue = Arc>>, +type JobQueue = Arc>>; enum ApplicationJob { NewConnector(ConnectorPDL), @@ -47,15 +55,11 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { - todo!("handling sync messages in ApplicationConnector"); - } - - fn insert_data_message(&mut self, message: DataMessage) { - todo!("handling messages in ApplicationConnector"); + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + todo!("handling messages in ConnectorApplication (API for runtime)") } - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop() { match job { @@ -90,9 +94,18 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { - let channel = self.runtime.global_store.ports.create_channel(self.connector_id); - self.owned_ports.push(channel.putter_id); - self.owned_ports.push(channel.getter_id); + // TODO: Duplicated logic in scheduler + let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + self.ports.push(Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + owning_connector: self.connector_id, + peer_connector: self.connector_id, + }); return channel; }