Files @ cf26538b25dc
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs - annotation

cf26538b25dc 6.3 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
architecture for send/recv ports in place
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::protocol::eval::ValueGroup;
use crate::runtime2::connector::{BranchId, PortIdLocal};

/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutboxMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

/// A message inserted into the inbox of a connector by the runtime.
#[derive(Clone)]
pub struct InboxMessage {
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

/// A message sent between connectors to communicate something about their
/// scheduling state.
pub enum ControlMessage {
    ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
    Ack(u32), // (control message ID)
}

/// The inbox of a connector. The owning connector (i.e. the thread that is
/// executing the connector) should be able to read all messages. Other
/// connectors (potentially executed by different threads) should be able to
/// append messages.
///
/// If a connector has no more code to run, and its inbox does not contain any
/// new messages, then it may go into sleep mode.
///
// TODO: @Optimize, this is a temporary lazy implementation
pub struct Inbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: RwLock<Vec<InboxMessage>>,
    len_read: AtomicUsize,
    // System messages. These are handled by the scheduler and only need to be
    // handled once.
    system_messages: Mutex<VecDeque<ControlMessage>>,
}

impl Inbox {
    pub fn new() -> Self {
        Self{
            messages: RwLock::new(Vec::new()),
            len_read: AtomicUsize::new(0),
            system_messages: Mutex::new(VecDeque::new()),
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&self, message: InboxMessage) {
        let mut messages = self.messages.write().unwrap();
        for existing in messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    existing.receiving_port == message.receiving_port {
                // Message was already received
                return;
            }
        }
        messages.push(message);
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        let lock = self.messages.read().unwrap();
        return InboxMessageIter{
            lock,
            next_index: 0,
            max_index: self.len_read.load(Ordering::Acquire),
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub fn next_message(&self) -> Option<InboxMessageRef> {
        let lock = self.messages.read().unwrap();
        let cur_index = self.len_read.load(Ordering::Acquire);
        if cur_index >= lock.len() {
            return None;
        }

        // TODO: Accept the correctness and simply make it an add, or even
        //  remove the atomic altogether.
        if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
            panic!("multiple readers modifying number of messages read");
        }

        return Some(InboxMessageRef{
            lock,
            index: cur_index,
        });
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
    }

    pub fn insert_control_message(&self, message: ControlMessage) {
        let mut lock = self.system_messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_control_message(&self) -> Option<ControlMessage> {
        let mut lock = self.system_messages.lock().unwrap();
        return lock.pop_front();
    }
}

/// Reference to a new message
pub struct InboxMessageRef<'i> {
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
    index: usize,
}

impl<'i> std::ops::Deref for InboxMessageRef<'i> {
    type Target = InboxMessage;

    fn deref(&self) -> &'i Self::Target {
        return &self.lock[self.index];
    }
}

/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
    type Item = &'m InboxMessage;

    fn next(&'m mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let cur_message = &self.lock[self.next_index];
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let message = &self.lock[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}