Changeset - cf26538b25dc
[Not reviewed]
0 5 1
MH - 4 years ago 2021-10-06 18:16:42
contact@maxhenger.nl
architecture for send/recv ports in place
6 files changed with 425 insertions and 128 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -3,24 +3,8 @@ use std::collections::HashMap;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{Inbox, Message};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub id: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ id: u32::MAX }
 
    }
 
}
 
use crate::runtime2::inbox::{Inbox, OutboxMessage};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
@@ -67,7 +51,7 @@ pub(crate) struct Branch {
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, Message>, // TODO: @temporary, remove together with fires()
 
    inbox: HashMap<PortIdLocal, OutboxMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
@@ -261,6 +245,23 @@ impl BranchQueue {
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads. Note that this is not enforced by the compiler. The global store
 
/// allows retrieving the entire `Connector` as a mutable reference by one
 
/// thread, and this `ConnectorPublic` by any number of threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: Inbox,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
        ConnectorPublic{
 
            inbox: Inbox::new(),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
pub(crate) struct Connector {
 
    // State and properties of connector itself
 
    id: u32,
 
@@ -271,8 +272,8 @@ pub(crate) struct Connector {
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    ports: ConnectorPorts,
 
    inbox: Inbox,
 
    pub ports: ConnectorPorts,
 
    pub public: ConnectorPublic,
 
}
 

	
 
struct TempCtx {}
 
@@ -307,7 +308,7 @@ impl Connector {
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            inbox: Inbox::new(),
 
            public: ConnectorPublic::new(),
 
        }
 
    }
 

	
 
@@ -441,7 +442,7 @@ impl Connector {
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix };
 
                let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
@@ -470,9 +471,8 @@ impl Connector {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = Message{
 
                    let message = OutboxMessage {
 
                        sending_port: local_port_id,
 
                        receiving_port: PortIdLocal::new_invalid(),
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
@@ -709,7 +709,7 @@ impl Connector {
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<Message>,
 
    pub outbox: Vec<OutboxMessage>,
 
    pub new_connectors: Vec<Connector>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
src/runtime2/global_store.rs
Show inline comments
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::Connector;
 
use super::connector::{Connector, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 

	
 
use std::ptr;
 
use std::sync::RwLock;
 
use std::sync::{RwLock, RwLockReadGuard};
 

	
 
/// A kind of token that, once obtained, allows access to a container.
 
struct ConnectorKey {
 
    index: u32, // of connector
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
struct ConnectorStore {
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut Connector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
        return Self{
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        };
 
    }
 

	
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
        }
 
    }
 

	
 
    fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
        }
 
    }
 

	
 
    fn create(&mut self, connector: Connector) -> ConnectorKey {
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, connector: Connector) -> ConnectorKey {
 
        let lock = self.inner.write().unwrap();
 

	
 
        let index;
 
        if self.free.is_empty() {
 
        if lock.free.is_empty() {
 
            let connector = Box::into_raw(Box::new(connector));
 

	
 
            unsafe {
 
                // Cheating a bit here. Anyway, move to heap, store in list
 
                index = self.connectors.len();
 
                self.connectors.push(connector);
 
                index = lock.connectors.len();
 
                lock.connectors.push(connector);
 
            }
 
        } else {
 
            index = self.free.pop().unwrap();
 
            index = lock.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                let target = lock.connectors.get_mut(index);
 
                debug_assert!(!target.is_null());
 
                ptr::write(*target, connector);
 
            }
 
@@ -54,79 +85,170 @@ impl ConnectorStore {
 
        return ConnectorKey{ index: index as u32 };
 
    }
 

	
 
    fn destroy(&mut self, key: ConnectorKey) {
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        self.free.push(key.index as usize);
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        for idx in 0..self.connectors.len() {
 
        let lock = self.inner.write().unwrap();
 

	
 
        for idx in 0..lock.connectors.len() {
 
            unsafe {
 
                let memory = *self.connectors.get_mut(idx);
 
                let boxed = Box::from_raw(memory); // takes care of deallocation
 
                let memory = *lock.connectors.get_mut(idx);
 
                let _ = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// The intention of this data structure is to enforce the rules:
 
/// TODO: @docs
 
pub struct GlobalStore {
 
    connector_queue: MpmcQueue<ConnectorKey>,
 
    connectors: RwLock<ConnectorStore>,
 
/// The registry of all ports
 
pub struct PortStore {
 
    inner: RwLock<PortStoreInner>,
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
struct PortStoreInner {
 
    ports: RawVec<Port>,
 
    free: Vec<usize>,
 
}
 

	
 
impl PortStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: RwLock::new(ConnectorStore::with_capacity(256)),
 
            inner: RwLock::new(PortStoreInner{
 
                ports: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        }
 
    }
 

	
 
    // Taking connectors out of global queue
 
    pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef {
 
        let lock = self.inner.read().unwrap();
 
        debug_assert!(port_id.is_valid());
 

	
 
        unsafe {
 
            let port = lock.ports.get_mut(port_id.index as usize);
 
            let port = &mut *port;
 
            debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing
 

	
 
    pub fn pop_key(&self) -> Option<ConnectorKey> {
 
        return self.connector_queue.pop_front();
 
            return PortRef{ lock, port };
 
        }
 
    }
 

	
 
    pub(crate) fn create_channel(&self, creating_connector: Option<u32>) -> Channel {
 
        let mut lock = self.inner.write().unwrap();
 

	
 
    pub fn push_key(&self, key: ConnectorKey) {
 
        self.connector_queue.push_back(key);
 
        // Reserves a new port. Doesn't point it to its counterpart
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<u32>) -> u32 {
 
            let index;
 
            let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned };
 
            let connector_id = creating_connector.unwrap_or(0);
 

	
 
            if lock.free.is_empty() {
 
                index = lock.ports.len() as u32;
 
                lock.ports.push(Port{
 
                    self_id: PortIdLocal::new(index),
 
                    peer_id: PortIdLocal::new_invalid(),
 
                    kind,
 
                    ownership,
 
                    owning_connector: connector_id,
 
                    peer_connector: connector_id
 
                });
 
            } else {
 
                index = lock.free.pop().unwrap() as u32;
 
                let port = unsafe{ &mut *lock.ports.get_mut(index as usize) };
 

	
 
                port.peer_id = PortIdLocal::new_invalid();
 
                port.kind = kind;
 
                port.ownership = ownership;
 
                port.owning_connector = connector_id;
 
                port.peer_connector = connector_id;
 
            }
 

	
 
    // Creating, retrieving and destroying connectors
 
            return index;
 
        }
 

	
 
    /// Retrieves a connector using the provided key. Note that the returned
 
    /// reference is not truly static, the `GlobalStore` needs to stay alive.
 
    pub fn get_connector(&self, key: &ConnectorKey) -> &'static mut Connector {
 
        let connectors = self.connectors.read().unwrap();
 
        return connectors.get_mut(key);
 
        // Create the ports
 
        let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector);
 
        let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector);
 
        debug_assert_ne!(putter_id, getter_id);
 

	
 
        // Point them to one another
 
        unsafe {
 
            let putter_port = &mut *lock.ports.get_mut(putter_id as usize);
 
            let getter_port = &mut *lock.ports.get_mut(getter_id as usize);
 
            putter_port.peer_id = getter_port.self_id;
 
            getter_port.peer_id = putter_port.self_id;
 
        }
 

	
 
    /// Adds a connector to the global system. Will also queue it to run
 
    pub fn add_connector(&self, connector: Connector) {
 
        let key = {
 
            let mut connectors = self.connectors.write().unwrap();
 
            connectors.create(connector)
 
        };
 
        return Channel{ putter_id, getter_id }
 
    }
 
}
 

	
 
pub struct PortRef<'p> {
 
    lock: RwLockReadGuard<'p, PortStoreInner>,
 
    port: &'static mut Port,
 
}
 

	
 
impl<'p> std::ops::Deref for PortRef<'p> {
 
    type Target = Port;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
        self.connector_queue.push_back(key);
 
impl<'p> std::ops::DerefMut for PortRef<'p> {
 
    fn deref_mut(&mut self) -> &mut Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
    /// Destroys a connector
 
    pub fn destroy_connector(&self, key: ConnectorKey) {
 
        let mut connectors = self.connectors.write().unwrap();
 
        connectors.destroy(key);
 
impl Drop for PortStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        // Very lazy code
 
        for idx in 0..lock.ports.len() {
 
            if lock.free.contains(&idx) {
 
                continue;
 
            }
 

	
 
            unsafe {
 
                let port = lock.ports.get_mut(idx);
 
                std::ptr::drop_in_place(port);
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
pub struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub ports: PortStore,
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            ports: PortStore::with_capacity(256),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox.rs
Show inline comments
 
use crate::common::Ordering;
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::connector::{BranchId, PortIdLocal};
 

	
 
/// A message in transit from one connector to another.
 
/// A message prepared by a connector. Waiting to be picked up by the runtime to
 
/// be sent to another connector.
 
#[derive(Clone)]
 
pub struct Message {
 
pub struct OutboxMessage {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message inserted into the inbox of a connector by the runtime.
 
#[derive(Clone)]
 
pub struct InboxMessage {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message sent between connectors to communicate something about their
 
/// scheduling state.
 
pub enum ControlMessage {
 
    ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
 
    Ack(u32), // (control message ID)
 
}
 

	
 
/// The inbox of a connector. The owning connector (i.e. the thread that is
 
/// executing the connector) should be able to read all messages. Other
 
/// connectors (potentially executed by different threads) should be able to
 
/// append messages.
 
///
 
/// Note that the logic inside of the inbox is strongly connected to deciding
 
/// whether or not a connector has nothing to execute, and is waiting on new
 
/// messages in order to continue.
 
/// If a connector has no more code to run, and its inbox does not contain any
 
/// new messages, then it may go into sleep mode.
 
///
 
// TODO: @Optimize, this is a temporary lazy implementation
 
pub struct Inbox {
 
    messages: Vec<Message>
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: RwLock<Vec<InboxMessage>>,
 
    len_read: AtomicUsize,
 
    // System messages. These are handled by the scheduler and only need to be
 
    // handled once.
 
    system_messages: Mutex<VecDeque<ControlMessage>>,
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
        Self{
 
            messages: RwLock::new(Vec::new()),
 
            len_read: AtomicUsize::new(0),
 
            system_messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
    pub fn insert_message(&self, message: InboxMessage) {
 
        let mut messages = self.messages.write().unwrap();
 
        for existing in messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
                // Message was already received
 
                return;
 
            }
 
                    last_idx += 1;
 
        }
 
        messages.push(message);
 
    }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        let lock = self.messages.read().unwrap();
 
        return InboxMessageIter{
 
            lock,
 
            next_index: 0,
 
            max_index: self.len_read.load(Ordering::Acquire),
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&self) -> Option<InboxMessageRef> {
 
        let lock = self.messages.read().unwrap();
 
        let cur_index = self.len_read.load(Ordering::Acquire);
 
        if cur_index >= lock.len() {
 
            return None;
 
        }
 

	
 
        // TODO: Accept the correctness and simply make it an add, or even
 
        //  remove the atomic altogether.
 
        if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
 
            panic!("multiple readers modifying number of messages read");
 
        }
 

	
 
        return &self.messages[0..0];
 
        return Some(InboxMessageRef{
 
            lock,
 
            index: cur_index,
 
        });
 
    }
 

	
 
    /// Simply empties the inbox
 
@@ -75,15 +122,61 @@ impl Inbox {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 
    pub fn insert_control_message(&self, message: ControlMessage) {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_control_message(&self) -> Option<ControlMessage> {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 
}
 

	
 
/// Reference to a new message
 
pub struct InboxMessageRef<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    index: usize,
 
}
 

	
 
impl<'i> std::ops::Deref for InboxMessageRef<'i> {
 
    type Target = InboxMessage;
 

	
 
    fn deref(&self) -> &'i Self::Target {
 
        return &self.lock[self.index];
 
    }
 
}
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m InboxMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.lock[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
        let message = &self.lock[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod port;
 
mod global_store;
 
mod scheduler;
 

	
src/runtime2/port.rs
Show inline comments
 
new file 100644
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
pub enum PortOwnership {
 
    Unowned, // i.e. held by a native application
 
    Owned,
 
    InTransit,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    // Once created, these values are immutable
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    // But this can be changed, but only by the connector that owns it
 
    pub ownership: PortOwnership,
 
    pub owning_connector: u32,
 
    pub peer_connector: u32, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: u32, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: u32, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -3,6 +3,7 @@ use std::time::Duration;
 
use std::thread;
 
use crate::ProtocolDescription;
 

	
 
use super::inbox::InboxMessage;
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 

	
 
@@ -23,13 +24,13 @@ impl Scheduler {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new()
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
            // Retrieve a unit of work
 
            let connector_key = self.global.pop_key();
 
            let connector_key = self.global.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                thread::sleep(Duration::new(1, 0));
 
@@ -38,13 +39,15 @@ impl Scheduler {
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.get_connector(&connector_key);
 
            let connector = self.global.connectors.get_mut(&connector_key);
 

	
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 

	
 
                // TODO: Check inbox for new message
 

	
 
                if connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
@@ -52,8 +55,27 @@ impl Scheduler {
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {
 
                        // There are message to send
 
                        for message in delta_state.outbox.drain(..) {
 
                            let (inbox_message, target_connector_id) = {
 
                                // Note: retrieving a port incurs a read lock
 
                                let sending_port = self.global.ports.get(&connector_key, message.sending_port);
 
                                (
 
                                    InboxMessage {
 
                                        sending_port: sending_port.self_id,
 
                                        receiving_port: sending_port.peer_id,
 
                                        sender_prev_branch_id: message.sender_prev_branch_id,
 
                                        sender_cur_branch_id: message.sender_cur_branch_id,
 
                                        message: message.message,
 
                                    },
 
                                    sending_port.peer_connector,
 
                                )
 
                            };
 

	
 
                            let target_connector = self.global.connectors.get_shared(target_connector_id);
 
                            target_connector.inbox.insert_message(inbox_message);
 

	
 
                            // TODO: Check silent state. Queue connector if it was silent
 
                        }
 
                    }
 
                } else {
 
@@ -65,7 +87,13 @@ impl Scheduler {
 
                    if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 
                        for connector in delta_state.new_connectors.drain(..) {
 
                            // Create connector, modify all of the ports that
 
                            // it now owns, then queue it for execution
 
                            let connector_key = self.global.connectors.create(connector);
 
                            
 
                            self.global.connector_queue.push_back(connector_key);
 
                        }
 
                    }
 
                }
 

	
0 comments (0 inline, 0 general)