use crate::common::Ordering; use crate::protocol::eval::ValueGroup; use crate::runtime2::connector::{BranchId, PortIdLocal}; /// A message in transit from one connector to another. #[derive(Clone)] pub struct Message { pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } /// The inbox of a connector. The owning connector (i.e. the thread that is /// executing the connector) should be able to read all messages. Other /// connectors (potentially executed by different threads) should be able to /// append messages. /// /// Note that the logic inside of the inbox is strongly connected to deciding /// whether or not a connector has nothing to execute, and is waiting on new /// messages in order to continue. pub struct Inbox { messages: Vec } impl Inbox { pub fn new() -> Self { Self{ messages: Vec::new() } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. pub fn insert_message(&mut self, message: Message) { match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { Ok(_) => {} // message already exists Err(idx) => self.messages.insert(idx, message) } } /// Retrieves all messages for the provided conditions pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { // Seek the first message with the appropriate port ID and branch ID let num_messages = self.messages.len(); for first_idx in 0..num_messages { let msg = &self.messages[first_idx]; if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { // Found a match, seek ahead until the condition is no longer true let mut last_idx = first_idx + 1; while last_idx < num_messages { let msg = &self.messages[last_idx]; if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { // No longer matching break; } last_idx += 1; } // Return all the matching messages return &self.messages[first_idx..last_idx]; } else if msg.receiving_port.id > port_id.id { // Because messages are ordered, this implies we couldn't find // any message break; } } return &self.messages[0..0]; } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); } // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur // branch id. fn compare_messages(a: &Message, b: &Message) -> Ordering { let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); if ord != Ordering::Equal { return ord; } ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); if ord != Ordering::Equal { return ord; } return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); } }