Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

58dfabd1be9f 7.3 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
moving to laptop
/**
inbox.rs

Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:

The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.

The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/

use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::protocol::eval::ValueGroup;
use super::connector::{BranchId, PortIdLocal};
use super::global_store::ConnectorId;

/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutgoingMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
#[derive(Clone)]
pub struct DataMessage {
    pub sending_connector: ConnectorId,
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

pub enum SyncBranchConstraint {
    SilentPort(PortIdLocal),
    BranchNumber(u32),
    PortMapping(PortIdLocal, u32),
}

pub struct SyncConnectorSolution {
    connector_id: ConnectorId,
    terminating_branch_id: BranchId,
    execution_branch_ids: Vec<BranchId>, // ends with terminating branch ID
}

pub struct SyncConnectorConstraints {
    connector_id: ConnectorId,
    constraints: Vec<SyncBranchConstraint>,
}

pub struct SyncMessage {
    connector_solutions: Vec<SyncConnectorSolution>,
    connector_constraints: Vec<SyncConnectorConstraints>,
    connectors_to_visit: Vec<u32>,
}

/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
pub struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sender: ConnectorId,
    pub content: ControlMessageVariant,
}

pub enum ControlMessageVariant {
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}

/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
/// out and handles all control message and potential routing). The correctly
/// addressed `Data` variants will end up at the connector.
pub enum Message {
    Data(DataMessage),          // data message, handled by connector
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
    Control(ControlMessage),    // control message, handled by scheduler
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}

pub struct PrivateInbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: Vec<DataMessage>,
    len_read: usize,
}

impl PrivateInbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
            len_read: 0,
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&mut self, message: DataMessage) {
        for existing in self.messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    existing.receiving_port == message.receiving_port {
                // Message was already received
                return;
            }
        }

        self.messages.push(message);
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        return InboxMessageIter{
            messages: &self.messages,
            next_index: 0,
            max_index: self.len_read,
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub fn next_message(&mut self) -> Option<&DataMessage> {
        if self.len_read == self.messages.len() {
            return None;
        }

        let to_return = &self.messages[self.len_read];
        self.len_read += 1;
        return Some(to_return);
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
        self.len_read = 0;
    }
}

/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
    messages: &'i Vec<DataMessage>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
    type Item = &'m DataMessage;

    fn next(&'m mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let cur_message = &self.messages[self.next_index];
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let message = &self.messages[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}