diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs new file mode 100644 index 0000000000000000000000000000000000000000..871ec8720c7256d6479bef3eb8c4bd7eb7838ada --- /dev/null +++ b/src/runtime2/inbox.rs @@ -0,0 +1,89 @@ +use crate::common::Ordering; +use crate::protocol::eval::ValueGroup; +use crate::runtime2::connector::{BranchId, PortIdLocal}; + +/// A message in transit from one connector to another. +#[derive(Clone)] +pub struct Message { + pub sending_port: PortIdLocal, + pub receiving_port: PortIdLocal, + pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id + pub sender_cur_branch_id: BranchId, // always valid + pub message: ValueGroup, +} + +/// The inbox of a connector. The owning connector (i.e. the thread that is +/// executing the connector) should be able to read all messages. Other +/// connectors (potentially executed by different threads) should be able to +/// append messages. +/// +/// Note that the logic inside of the inbox is strongly connected to deciding +/// whether or not a connector has nothing to execute, and is waiting on new +/// messages in order to continue. +pub struct Inbox { + messages: Vec +} + +impl Inbox { + pub fn new() -> Self { + Self{ messages: Vec::new() } + } + + /// Will insert the message into the inbox. Only exception is when the tuple + /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then + /// nothing is inserted.. + pub fn insert_message(&mut self, message: Message) { + match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { + Ok(_) => {} // message already exists + Err(idx) => self.messages.insert(idx, message) + } + } + + /// Retrieves all messages for the provided conditions + pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { + // Seek the first message with the appropriate port ID and branch ID + let num_messages = self.messages.len(); + + for first_idx in 0..num_messages { + let msg = &self.messages[first_idx]; + if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { + // Found a match, seek ahead until the condition is no longer true + let mut last_idx = first_idx + 1; + while last_idx < num_messages { + let msg = &self.messages[last_idx]; + if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { + // No longer matching + break; + } + last_idx += 1; + } + + // Return all the matching messages + return &self.messages[first_idx..last_idx]; + } else if msg.receiving_port.id > port_id.id { + // Because messages are ordered, this implies we couldn't find + // any message + break; + } + } + + return &self.messages[0..0]; + } + + /// Simply empties the inbox + pub fn clear(&mut self) { + self.messages.clear(); + } + + // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur + // branch id. + fn compare_messages(a: &Message, b: &Message) -> Ordering { + let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); + if ord != Ordering::Equal { return ord; } + + ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); + if ord != Ordering::Equal { return ord; } + + return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); + } +}