Changeset - 44f84629849b
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-15 19:20:40
contact@maxhenger.nl
WIP on moving ports around in code
4 files changed with 53 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -222,12 +222,13 @@ impl ConnectorPorts {
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
@@ -338,12 +339,16 @@ impl ConnectorPDL {
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s).
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 

	
src/runtime2/global_store.rs
Show inline comments
 
@@ -5,12 +5,15 @@ use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use std::ptr;
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, SyncMessage};
 
use crate::runtime2::native::Connector;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
@@ -49,22 +52,47 @@ impl ConnectorId {
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`.
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn insert_data_message(&mut self, message: DataMessage) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_data_message(message),
 
            ConnectorVariant::Native(c) => c.insert_data_message(message),
 
        }
 
    }
 

	
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state),
 
            ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
src/runtime2/native.rs
Show inline comments
 
@@ -3,24 +3,24 @@ use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 

	
 
pub trait Connector {
 
    fn insert_data_message(&mut self, message: DataMessage);
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,
 

	
 
enum ApplicationJob {
 
@@ -44,21 +44,21 @@ impl ConnectorApplication {
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        todo!("handling sync messages in ApplicationConnector");
 
    }
 

	
 
    fn insert_data_message(&mut self, message: DataMessage)  {
 
        todo!("handling messages in ApplicationConnector");
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -3,23 +3,28 @@ use std::sync::Condvar;
 
use std::sync::atomic::Ordering;
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::native::Connector;
 

	
 
use super::RuntimeInner;
 
use super::port::{PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    }
 

	
 
    pub fn run(&mut self) {
 
@@ -49,24 +54,26 @@ impl Scheduler {
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // TODO: Put header in front of messages, this is a mess
 
                    match message {
 
                        Message::Data(message) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                                scheduled.connector.insert_data_message(message);
 
                            }
 
                        },
 
                        Message::Sync(message) => {
 
                            scheduled.connector
 
                            // TODO: Come back here after rewriting port ownership stuff
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute()
 
                        },
 
                        Message::Solution(solution) => {
 

	
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
 
@@ -279,16 +286,16 @@ impl Router {
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == message.sending_connector &&
 
                reroute.port == message.sending_port {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
0 comments (0 inline, 0 general)