Files @ 8c5d438b0fa3
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

8c5d438b0fa3 3.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
rewriting inbox to behave mpsc-like
use crate::common::Ordering;
use crate::protocol::eval::ValueGroup;
use crate::runtime2::connector::{BranchId, PortIdLocal};

/// A message in transit from one connector to another.
#[derive(Clone)]
pub struct Message {
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

/// The inbox of a connector. The owning connector (i.e. the thread that is
/// executing the connector) should be able to read all messages. Other
/// connectors (potentially executed by different threads) should be able to
/// append messages.
///
/// Note that the logic inside of the inbox is strongly connected to deciding
/// whether or not a connector has nothing to execute, and is waiting on new
/// messages in order to continue.
pub struct Inbox {
    messages: Vec<Message>
}

impl Inbox {
    pub fn new() -> Self {
        Self{ messages: Vec::new() }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&mut self, message: Message) {
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
            Ok(_) => {} // message already exists
            Err(idx) => self.messages.insert(idx, message)
        }
    }

    /// Retrieves all messages for the provided conditions
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
        // Seek the first message with the appropriate port ID and branch ID
        let num_messages = self.messages.len();

        for first_idx in 0..num_messages {
            let msg = &self.messages[first_idx];
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
                // Found a match, seek ahead until the condition is no longer true
                let mut last_idx = first_idx + 1;
                while last_idx < num_messages {
                    let msg = &self.messages[last_idx];
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
                        // No longer matching
                        break;
                    }
                    last_idx += 1;
                }

                // Return all the matching messages
                return &self.messages[first_idx..last_idx];
            } else if msg.receiving_port.id > port_id.id {
                // Because messages are ordered, this implies we couldn't find
                // any message
                break;
            }
        }

        return &self.messages[0..0];
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
    }

    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
    // branch id.
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
        if ord != Ordering::Equal { return ord; }

        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
        if ord != Ordering::Equal { return ord; }

        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
    }
}