Changeset - 44f84629849b
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-15 19:20:40
contact@maxhenger.nl
WIP on moving ports around in code
4 files changed with 53 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -204,48 +204,49 @@ impl ConnectorPorts {
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    /// Retrieves the ID associated with the port at the provided index
 
    #[inline]
 
    fn get_port_id(&self, port_index: usize) -> PortIdLocal {
 
        return self.owned_ports[port_index];
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
@@ -320,48 +321,52 @@ impl ConnectorPDL {
 
    /// receive its ID.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        debug_assert!(!self.id.is_valid()); // ID should only be set once
 
        self.id = id;
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s).
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == self.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
                // Load solution branch
 
                let branch = &self.branches[branch_index as usize];
src/runtime2/global_store.rs
Show inline comments
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use std::ptr;
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, SyncMessage};
 
use crate::runtime2::native::Connector;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
pub(crate) struct ConnectorId(u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`.
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn insert_data_message(&mut self, message: DataMessage) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_data_message(message),
 
            ConnectorVariant::Native(c) => c.insert_data_message(message),
 
        }
 
    }
 

	
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state),
 
            ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
struct ConnectorStore {
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        return Self{
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        };
src/runtime2/native.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 

	
 
pub trait Connector {
 
    fn insert_data_message(&mut self, message: DataMessage);
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,
 

	
 
enum ApplicationJob {
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
 
    fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) {
 
        todo!("handling sync messages in ApplicationConnector");
 
    }
 

	
 
    fn insert_data_message(&mut self, message: DataMessage)  {
 
        todo!("handling messages in ApplicationConnector");
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::Ordering;
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::native::Connector;
 

	
 
use super::RuntimeInner;
 
use super::port::{PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                thread::sleep(Duration::new(1, 0));
 
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    break 'thread_loop;
 
                }
 

	
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // TODO: Put header in front of messages, this is a mess
 
                    match message {
 
                        Message::Data(message) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                                scheduled.connector.insert_data_message(message);
 
                            }
 
                        },
 
                        Message::Sync(message) => {
 
                            scheduled.connector
 
                            // TODO: Come back here after rewriting port ownership stuff
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute()
 
                        },
 
                        Message::Solution(solution) => {
 

	
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = self.runtime.global_store.ports.get(&connector_key, port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                    // And respond with an Ack
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        message.sender,
 
                                        Message::Control(ControlMessage{
 
                                            id: message.id,
 
                                            sender: connector_key.downcast(),
 
                                            content: ControlMessageVariant::Ack,
 
                                        })
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(message.id);
 
@@ -261,47 +268,47 @@ impl Router {
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            sender: self_connector_id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == message.sending_connector &&
 
                reroute.port == message.sending_port {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 

	
 
        match index {
 
            Some(index) => { self.active.remove(index); },
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)