Changeset - ab8066658334
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-25 11:39:31
contact@maxhenger.nl
WIP on shutting down connectors
4 files changed with 110 insertions and 26 deletions:
0 comments (0 inline, 0 general)
src/runtime2/inbox.rs
Show inline comments
 
/**
 
inbox.rs
 

	
 
Contains various types of inboxes and message types for the connectors. There
 
are two kinds of inboxes:
 

	
 
The `PublicInbox` is a simple message queue. Messages are put in by various
 
threads, and they're taken out by a single thread. These messages may contain
 
control messages and may be filtered or redirected by the scheduler.
 

	
 
The `PrivateInbox` is a temporary storage for all messages that are received
 
within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::Mutex;
 

	
 
use super::ConnectorId;
 
use crate::protocol::eval::ValueGroup;
 
use super::connector::BranchId;
 
use super::port::PortIdLocal;
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
// TODO: Remove Debug on messages
 
#[derive(Debug, Clone)]
 
pub struct DataMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum SyncBranchConstraint {
 
    SilentPort(PortIdLocal),
 
    BranchNumber(BranchId),
 
    PortMapping(PortIdLocal, BranchId),
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SyncConnectorSolution {
 
    pub connector_id: ConnectorId,
 
    pub terminating_branch_id: BranchId,
 
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
 
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SyncConnectorConstraints {
 
    pub connector_id: ConnectorId,
 
    pub constraints: Vec<SyncBranchConstraint>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SyncMessage {
 
    pub local_solutions: Vec<SyncConnectorSolution>,
 
    pub constraints: Vec<SyncConnectorConstraints>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
// TODO: Shouldn't really be here, right?
 
impl SyncMessage {
 
    /// Creates a new sync message. Assumes that it is created by a connector
 
    /// that has just encountered a new local solution.
 
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
 
        let mut local_solutions = Vec::with_capacity(approximate_peers);
 
        local_solutions.push(initial_solution);
 

	
 
        return Self{
 
            local_solutions,
 
            constraints: Vec::with_capacity(approximate_peers),
 
            to_visit: Vec::with_capacity(approximate_peers),
 
        };
 
    }
 

	
 
    /// Checks if a connector has already provided a local solution
 
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
 
        return self.local_solutions
 
            .iter()
 
            .any(|v| v.connector_id == connector_id);
 
    }
 

	
 
    /// Adds a new constraint. If the connector has already provided a local
 
    /// solution then the constraint will be checked. Otherwise the constraint
 
    /// will be added to the solution. If this is the first constraint for a
 
    /// connector then it will be added to the connectors that still have to be
 
    /// visited.
 
    ///
 
    /// If this returns true then the constraint was added, or the local
 
    /// solution for the specified connector satisfies the constraint. If this
 
    /// function returns an error then we're dealing with a nefarious peer.
 
    pub(crate) fn add_or_check_constraint(
 
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
 
    ) -> Result<bool, ()> {
 
        if self.has_local_solution_for(connector_id) {
 
            return self.check_constraint(connector_id, constraint);
 
        } else {
 
            self.add_constraint(connector_id, constraint);
 
            return Ok(true);
 
        }
 
    }
 

	
 
    /// Pushes a new connector constraint. Caller must ensure that the solution
 
    /// has not yet arrived at the specified connector (because then it would no
 
    /// longer have constraints, but a proposed solution instead).
 
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
 
        debug_assert!(!self.has_local_solution_for(connector_id));
 

	
 
        let position = self.constraints
 
            .iter()
 
            .position(|v| v.connector_id == connector_id);
 

	
 
        match position {
 
            Some(index) => {
 
                // Has pre-existing constraints
 
                debug_assert!(self.to_visit.contains(&connector_id));
 
                let entry = &mut self.constraints[index];
 
                entry.constraints.push(constraint);
 
            },
 
            None => {
 
                debug_assert!(!self.to_visit.contains(&connector_id));
 
                self.constraints.push(SyncConnectorConstraints{
 
                    connector_id,
 
                    constraints: vec![constraint],
 
                });
 
                self.to_visit.push(connector_id);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if a constraint is satisfied by a solution. Caller must make sure
 
    /// that a local solution has already been provided. Will return an error
 
    /// value only if the provided constraint does not make sense (i.e. a
 
    /// nefarious peer has supplied a constraint with a port we do not own).
 
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
 
        debug_assert!(self.has_local_solution_for(connector_id));
 

	
 
        let entry = self.local_solutions
 
            .iter()
 
            .find(|v| v.connector_id == connector_id)
 
            .unwrap();
 

	
 
        match constraint {
 
            SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if *port_id == silent_port_id {
 
                        // If silent, then mapped ID is invalid
 
                        return Ok(!mapped_id.is_valid())
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
 
            },
 
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == port_id {
 
                        return Ok(*mapped_id == expected_branch_id);
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SolutionMessage {
 
    pub comparison_number: u64,
 
    pub connector_origin: ConnectorId,
 
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
#[derive(Debug, Clone)]
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub content: ControlMessageVariant,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    CloseChannel(PortIdLocal), // close the port associated with this
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message contents.
 
#[derive(Debug, Clone)]
 
pub enum MessageContents {
 
    Data(DataMessage),              // data message, handled by connector
 
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
 
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
 
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
 
    Control(ControlMessage),        // control message, handled by scheduler
 
    Ping,                           // ping message, intentionally waking up a connector (used for native connectors)
 
}
 

	
 
#[derive(Debug)]
 
pub struct Message {
 
    pub sending_connector: ConnectorId,
 
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
 
    pub contents: MessageContents,
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub(crate) struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: Vec<(PortIdLocal, DataMessage)>,
 
    len_read: usize,
 
}
 

	
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        for (existing_target_port, existing) in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    *existing_target_port == target_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push((target_port, message));
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter {
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let (_, to_return) = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub(crate) fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub(crate) struct InboxMessageIter<'i> {
 
    messages: &'i Vec<(PortIdLocal, DataMessage)>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let (target_port, cur_message) = &self.messages[self.next_index];
 
            if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let (_, message) = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod native;
 
mod port;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 

	
 
// Imports
 

	
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Condvar, Mutex, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::Port;
 
use crate::runtime2::scheduler::SchedulerCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub(crate) enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
    pub pending_acks: u32,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            protocol_description,
 
            port_counter: AtomicU32::new(0),
 
            connectors: RwLock::new(ConnectorStore::with_capacity(32)),
 
            connector_queue: Mutex::new(VecDeque::with_capacity(32)),
 
            schedulers: Mutex::new(Vec::new()),
 
            scheduler_notifier: Condvar::new(),
 
            active_connectors: AtomicU32::new(0),
 
            active_interfaces: AtomicU32::new(1), // this `Runtime` instance
 
            should_exit: AtomicBool::new(false),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads as usize);
 
            for thread_index in 0..num_threads {
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::Builder::new()
 
                    .name(format!("thread-{}", thread_index))
 
                    .spawn(move || {
 
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
 
                        scheduler.run();
 
                    })
 
                    .unwrap();
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        self.inner.increment_active_interfaces();
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector_key = self.inner.create_interface_component(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_interfaces();
 
        let mut lock = self.inner.schedulers.lock().unwrap();
 
        for handle in lock.drain(..) {
 
            handle.join().unwrap();
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// RuntimeInner
 
// -----------------------------------------------------------------------------
 

	
 
pub(crate) struct RuntimeInner {
 
    // Protocol
 
    pub(crate) protocol_description: ProtocolDescription,
 
    // Regular counter for port IDs
 
    port_counter: AtomicU32,
 
    // Storage of connectors and the work queue
 
    connectors: RwLock<ConnectorStore>,
 
    connector_queue: Mutex<VecDeque<ConnectorKey>>,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>,
 
    // Conditions to determine whether the runtime can exit
 
    scheduler_notifier: Condvar,  // coupled to mutex on `connector_queue`.
 
    // TODO: Figure out if we can simply merge the counters?
 
    active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active)
 
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
 
    should_exit: AtomicBool,
 
}
 

	
 
impl RuntimeInner {
 
    // --- Managing the components queued for execution
 

	
 
    /// Wait until there is a connector to run. If there is one, then `Some`
 
    /// will be returned. If there is no more work, then `None` will be
 
    /// returned.
 
    pub(crate) fn wait_for_work(&self) -> Option<ConnectorKey> {
 
        let mut lock = self.connector_queue.lock().unwrap();
 
        while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) {
 
            lock = self.scheduler_notifier.wait(lock).unwrap();
 
        }
 

	
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn push_work(&self, key: ConnectorKey) {
 
        let mut lock = self.connector_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.scheduler_notifier.notify_one();
 
    }
 

	
 
    // --- Creating/using ports
 

	
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    /// Sends a message to a particular connector. If the connector happened to
 
    /// be sleeping then it will be scheduled for execution.
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) {
 
        let target = self.get_component_public(target_id);
 
        target.inbox.insert_message(message);
 

	
 
        let should_wake_up = target.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(target_id) };
 
            self.push_work(key);
 
        }
 
    }
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
        // Initialize as sleeping, as it will be scheduled by the programmer.
 
        let mut lock = self.connectors.write().unwrap();
 
        let key = lock.create(ConnectorVariant::Native(Box::new(component)), true);
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 

	
 
    /// Creates a new PDL component. The caller MUST make sure to schedule the
 
    /// connector.
 
    // TODO: Nicer code, not forcing the caller to schedule, perhaps?
 
    pub(crate) fn create_pdl_component(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        // Create as not sleeping, as we'll schedule it immediately
 
        let key = {
 
            let mut lock = self.connectors.write().unwrap();
 
            lock.create(ConnectorVariant::UserDefined(connector), false)
 
        };
 

	
 
        // Transfer the ports
 
        {
 
            let lock = self.connectors.read().unwrap();
 
            let created = lock.get_private(&key);
 

	
 
            match &created.connector {
 
                ConnectorVariant::UserDefined(connector) => {
 
                    for port_id in connector.ports.owned_ports.iter().copied() {
 
                        println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index);
 
                        let mut port = created_by.context.remove_port(port_id);
 
                        created.context.add_port(port);
src/runtime2/port.rs
Show inline comments
 
use super::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
/// Represents a port inside of the runtime. This is generally the local view of
 
/// a connector on its port, which may not be consistent with the rest of the
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortIdLocal};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new() -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            println!("DEBUG [{}]: Waiting for work", scheduler_id);
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                println!("DEBUG [{}]: ... No more work, quitting", scheduler_id);
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index);
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message);
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                // TODO: Question from Max from the past: what the hell did you mean?
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::CloseChannel(port_id) => {
 
                                // Mark the port as being closed
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.state = PortState::Closed;
 

	
 
                                // Send an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 

	
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(
 
                        scheduler_ctx, &scheduled.context, &mut delta_state
 
                    );
 
                    println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
 
                    scheduled.public.sleeping.store(true, Ordering::Release);
 

	
 
                    // We might have received a message in the meantime from a
 
                    // thread that did not see the sleeping flag set to `true`,
 
                    // so:
 
                    if !scheduled.public.inbox.is_empty() {
 
                        let should_reschedule_self = scheduled.public.sleeping
 
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.runtime.push_work(connector_key);
 
                        }
 
                    }
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.context.ports {
 
                        self.runtime.send_message(port.peer_connector, Message{
 
                            sending_connector: connector_key.downcast(),
 
                            receiving_port: port.peer_id,
 
                            contents: MessageContents::Control(ControlMessage{
 
                                id: 0,
 
                                content: ControlMessageVariant::Ack
 
                            })
 
                        })
 
                        let message = scheduled.router.prepare_closing_channel(
 
                            port.self_id, port.peer_id,
 
                            connector_id
 
                        );
 
                        self.runtime.send_message(port.peer_connector, message);
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        // Handling any messages that were sent
 
        let connector_id = connector_key.downcast();
 

	
 
        if !delta_state.outbox.is_empty() {
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                let (peer_connector, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = context.get_port(contents.sending_port);
 
                        (port.peer_connector, port.peer_id)
 
                    },
 
                    MessageContents::Sync(contents) => {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.runtime.send_message(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 

	
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.runtime.send_message(peer_connector, message);
 
                }
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                context.ports.push(port);
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.runtime.get_component_private(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.create_pdl_component(cur_connector, new_connector);
 
                let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    cur_connector.pending_acks += 1;
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.runtime.push_work(new_key);
 
            }
 
        }
 

	
 
        debug_assert!(delta_state.outbox.is_empty());
 
        debug_assert!(delta_state.new_ports.is_empty());
 
        debug_assert!(delta_state.new_connectors.is_empty());
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 

	
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: connectorId
 
    ) -> Message {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
 
                source_port: self_port_id,
 
                target_port: peer_port_id,
 
            }),
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::CloseChannel(peer_port_id),
 
            }),
 
        };
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        let (new_id_counter, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id_counter;
 
        let id = self.take_id();
 

	
 
        self.active.push(ReroutedTraffic{
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                target_port: port_id,
 
                source_connector: peer_connector_id,
 
                target_connector: new_owner_connector_id,
 
            }),
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.target_port == target_port {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = entry {
 
                if entry.source_connector == sending_connector &&
 
                    entry.target_port == target_port {
 
                    // Need to reroute this message
 
                return Some(reroute.target_connector);
 
                    return Some(entry.target_connector);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 

	
 
        match index {
 
            Some(index) => { self.active.remove(index); },
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 

	
 
    /// Retrieves the number of responses we still expect to receive from our
 
    /// peers
 
    #[inline]
 
    pub fn num_pending_acks(&self) -> usize {
 
        return self.active.len();
 
    }
 

	
 
    fn take_id(&mut self) -> u32 {
 
        let generated_id = self.id_counter;
 
        let (new_id, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id;
 

	
 
        return generated_id;
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)