From 44f84629849b3154cc692c7c04712def4d4e8b60 2021-10-15 19:20:40 From: mh Date: 2021-10-15 19:20:40 Subject: [PATCH] WIP on moving ports around in code --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 64edbdde1602c20a4130b8dd1fc6d55ca69ca186..0cd20a6adf9fa855d54512e54e2b32c76eb19717 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -225,6 +225,7 @@ impl ConnectorPorts { return &mut self.port_mapping(mapped_idx); } + #[inline] fn num_ports(&self) -> usize { return self.owned_ports.len(); } @@ -341,6 +342,10 @@ impl ConnectorPDL { return self.in_sync; } + pub fn insert_data_message(&mut self, message: DataMessage) { + self.inbox.insert_message(message); + } + /// Accepts a synchronous message and combines it with the locally stored /// solution(s). pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) { diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 4d1727af58fd65e42030dca6e874baab875504bb..191c529907db1526c6cd6337f190724e0830e2d3 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -8,6 +8,9 @@ use super::scheduler::Router; use std::ptr; use std::sync::{Barrier, RwLock, RwLockReadGuard}; use std::sync::atomic::AtomicBool; +use crate::ProtocolDescription; +use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; +use crate::runtime2::inbox::{DataMessage, SyncMessage}; use crate::runtime2::native::Connector; /// A kind of token that, once obtained, allows mutable access to a connector. @@ -52,16 +55,41 @@ impl ConnectorId { } // TODO: Change this, I hate this. But I also don't want to put `public` and -// `router` of `ScheduledConnector` back into `Connector`. +// `router` of `ScheduledConnector` back into `Connector`. The reason I don't +// want `Box` everywhere is because of the v-table overhead. But +// to truly design this properly I need some benchmarks. pub enum ConnectorVariant { UserDefined(ConnectorPDL), Native(Box), } +impl Connector for ConnectorVariant { + fn insert_data_message(&mut self, message: DataMessage) { + match self { + ConnectorVariant::UserDefined(c) => c.insert_data_message(message), + ConnectorVariant::Native(c) => c.insert_data_message(message), + } + } + + fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { + match self { + ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state), + ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state), + } + } + + fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + match self { + ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state), + ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state), + } + } +} + pub struct ScheduledConnector { pub connector: ConnectorVariant, pub public: ConnectorPublic, - pub router: Router + pub router: Router, } /// The registry containing all connectors. The idea here is that when someone diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 7247c23a3b1190755c7c3aa479bf699582c90771..61c761bcc1fb52194a9abfe63be452b6aee1837a 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -6,7 +6,7 @@ use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; -use crate::runtime2::global_store::ConnectorKey; +use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; use super::RuntimeInner; use super::global_store::{ConnectorVariant, ConnectorId}; @@ -16,8 +16,8 @@ use super::inbox::{Message, DataMessage, SyncMessage}; pub trait Connector { fn insert_data_message(&mut self, message: DataMessage); - fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState); - fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState); + fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -47,7 +47,7 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) { + fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { todo!("handling sync messages in ApplicationConnector"); } @@ -55,7 +55,7 @@ impl Connector for ConnectorApplication { todo!("handling messages in ApplicationConnector"); } - fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop() { match job { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index b6706e8be16729c1218fa97812bd3c1916225be2..e1f59e016d3faae5850be6fe58a89953f32971a2 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -6,6 +6,7 @@ use std::thread; use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; +use crate::runtime2::native::Connector; use super::RuntimeInner; use super::port::{PortIdLocal}; @@ -17,6 +18,10 @@ pub(crate) struct Scheduler { runtime: Arc, } +// Thinking aloud: actual ports should be accessible by connector, but managed +// by the scheduler (to handle rerouting messages). We could just give a read- +// only context, instead of an extra call on the "Connector" trait. + impl Scheduler { pub fn new(runtime: Arc) -> Self { return Self{ runtime }; @@ -52,18 +57,20 @@ impl Scheduler { while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { + // TODO: Put header in front of messages, this is a mess match message { Message::Data(message) => { // Check if we need to reroute, or can just put it // in the private inbox of the connector - if let Some(other_connector_id) = scheduled.router.should_reroute(&message) { + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) { self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); } else { - scheduled.connector.inbox.insert_message(message); + scheduled.connector.insert_data_message(message); } }, Message::Sync(message) => { - scheduled.connector + // TODO: Come back here after rewriting port ownership stuff + if let Some(other_connector_id) = scheduled.router.should_reroute() }, Message::Solution(solution) => { @@ -282,10 +289,10 @@ impl Router { /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. - pub fn should_reroute(&self, message: &DataMessage) -> Option { + pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option { for reroute in &self.active { - if reroute.source_connector == message.sending_connector && - reroute.port == message.sending_port { + if reroute.source_connector == sending_connector && + reroute.port == sending_port { // Need to reroute this message return Some(reroute.target_connector); }