Changeset - 44f84629849b
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-15 19:20:40
contact@maxhenger.nl
WIP on moving ports around in code
4 files changed with 53 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -225,6 +225,7 @@ impl ConnectorPorts {
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 
@@ -341,6 +342,10 @@ impl ConnectorPDL {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s).
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) {
src/runtime2/global_store.rs
Show inline comments
 
@@ -8,6 +8,9 @@ use super::scheduler::Router;
 
use std::ptr;
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, SyncMessage};
 
use crate::runtime2::native::Connector;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
@@ -52,16 +55,41 @@ impl ConnectorId {
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`.
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn insert_data_message(&mut self, message: DataMessage) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_data_message(message),
 
            ConnectorVariant::Native(c) => c.insert_data_message(message),
 
        }
 
    }
 

	
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state),
 
            ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
src/runtime2/native.rs
Show inline comments
 
@@ -6,7 +6,7 @@ use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
@@ -16,8 +16,8 @@ use super::inbox::{Message, DataMessage, SyncMessage};
 

	
 
pub trait Connector {
 
    fn insert_data_message(&mut self, message: DataMessage);
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
@@ -47,7 +47,7 @@ impl ConnectorApplication {
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        todo!("handling sync messages in ApplicationConnector");
 
    }
 

	
 
@@ -55,7 +55,7 @@ impl Connector for ConnectorApplication {
 
        todo!("handling messages in ApplicationConnector");
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -6,6 +6,7 @@ use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::native::Connector;
 

	
 
use super::RuntimeInner;
 
use super::port::{PortIdLocal};
 
@@ -17,6 +18,10 @@ pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
@@ -52,18 +57,20 @@ impl Scheduler {
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // TODO: Put header in front of messages, this is a mess
 
                    match message {
 
                        Message::Data(message) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                                scheduled.connector.insert_data_message(message);
 
                            }
 
                        },
 
                        Message::Sync(message) => {
 
                            scheduled.connector
 
                            // TODO: Come back here after rewriting port ownership stuff
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute()
 
                        },
 
                        Message::Solution(solution) => {
 

	
 
@@ -282,10 +289,10 @@ impl Router {
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == message.sending_connector &&
 
                reroute.port == message.sending_port {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
0 comments (0 inline, 0 general)