Files @ cf26538b25dc
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

cf26538b25dc 6.3 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
architecture for send/recv ports in place
use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::protocol::eval::ValueGroup;
use crate::runtime2::connector::{BranchId, PortIdLocal};

/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutboxMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

/// A message inserted into the inbox of a connector by the runtime.
#[derive(Clone)]
pub struct InboxMessage {
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

/// A message sent between connectors to communicate something about their
/// scheduling state.
pub enum ControlMessage {
    ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
    Ack(u32), // (control message ID)
}

/// The inbox of a connector. The owning connector (i.e. the thread that is
/// executing the connector) should be able to read all messages. Other
/// connectors (potentially executed by different threads) should be able to
/// append messages.
///
/// If a connector has no more code to run, and its inbox does not contain any
/// new messages, then it may go into sleep mode.
///
// TODO: @Optimize, this is a temporary lazy implementation
pub struct Inbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: RwLock<Vec<InboxMessage>>,
    len_read: AtomicUsize,
    // System messages. These are handled by the scheduler and only need to be
    // handled once.
    system_messages: Mutex<VecDeque<ControlMessage>>,
}

impl Inbox {
    pub fn new() -> Self {
        Self{
            messages: RwLock::new(Vec::new()),
            len_read: AtomicUsize::new(0),
            system_messages: Mutex::new(VecDeque::new()),
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&self, message: InboxMessage) {
        let mut messages = self.messages.write().unwrap();
        for existing in messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    existing.receiving_port == message.receiving_port {
                // Message was already received
                return;
            }
        }
        messages.push(message);
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        let lock = self.messages.read().unwrap();
        return InboxMessageIter{
            lock,
            next_index: 0,
            max_index: self.len_read.load(Ordering::Acquire),
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub fn next_message(&self) -> Option<InboxMessageRef> {
        let lock = self.messages.read().unwrap();
        let cur_index = self.len_read.load(Ordering::Acquire);
        if cur_index >= lock.len() {
            return None;
        }

        // TODO: Accept the correctness and simply make it an add, or even
        //  remove the atomic altogether.
        if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
            panic!("multiple readers modifying number of messages read");
        }

        return Some(InboxMessageRef{
            lock,
            index: cur_index,
        });
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
    }

    pub fn insert_control_message(&self, message: ControlMessage) {
        let mut lock = self.system_messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_control_message(&self) -> Option<ControlMessage> {
        let mut lock = self.system_messages.lock().unwrap();
        return lock.pop_front();
    }
}

/// Reference to a new message
pub struct InboxMessageRef<'i> {
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
    index: usize,
}

impl<'i> std::ops::Deref for InboxMessageRef<'i> {
    type Target = InboxMessage;

    fn deref(&self) -> &'i Self::Target {
        return &self.lock[self.index];
    }
}

/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
    type Item = &'m InboxMessage;

    fn next(&'m mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let cur_message = &self.lock[self.next_index];
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let message = &self.lock[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}