diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs index 05713bbcd5939ef446291eecd6e54aa05c1130dc..fde4cd812a4004c39cc59875c754e1ceb15a3a6c 100644 --- a/src/runtime2/messages.rs +++ b/src/runtime2/messages.rs @@ -1,13 +1,14 @@ -use std::collections::HashMap; -use std::collections::hash_map::Entry; use std::cmp::Ordering; +use std::collections::hash_map::Entry; +use std::collections::HashMap; -use super::connector::{PortIdLocal, BranchId}; -use crate::PortId; use crate::common::Id; +use crate::PortId; use crate::protocol::*; use crate::protocol::eval::*; +use super::connector::{BranchId, PortIdLocal}; + /// A message residing in a connector's inbox (waiting to be put into some kind /// of speculative branch), or a message waiting to be sent. #[derive(Clone)] @@ -19,83 +20,6 @@ pub struct BufferedMessage { pub(crate) message: ValueGroup, } -#[derive(Clone)] -pub struct Message { - pub sending_port: PortIdLocal, - pub receiving_port: PortIdLocal, - pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id - pub sender_cur_branch_id: BranchId, // always valid - pub message: ValueGroup, -} - -pub struct Inbox { - messages: Vec -} - -impl Inbox { - pub fn new() -> Self { - Self{ messages: Vec::new() } - } - - /// Will insert the message into the inbox. Only exception is when the tuple - /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then - /// nothing is inserted.. - pub fn insert_message(&mut self, message: Message) { - match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { - Ok(_) => {} // message already exists - Err(idx) => self.messages.insert(idx, message) - } - } - - /// Retrieves all messages for the provided conditions - pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { - // Seek the first message with the appropriate port ID and branch ID - let num_messages = self.messages.len(); - - for first_idx in 0..num_messages { - let msg = &self.messages[first_idx]; - if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { - // Found a match, seek ahead until the condition is no longer true - let mut last_idx = first_idx + 1; - while last_idx < num_messages { - let msg = &self.messages[last_idx]; - if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { - // No longer matching - break; - } - last_idx += 1; - } - - // Return all the matching messages - return &self.messages[first_idx..last_idx]; - } else if msg.receiving_port.id > port_id.id { - // Because messages are ordered, this implies we couldn't find - // any message - break; - } - } - - return &self.messages[0..0]; - } - - /// Simply empties the inbox - pub fn clear(&mut self) { - self.messages.clear(); - } - - // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur - // branch id. - fn compare_messages(a: &Message, b: &Message) -> Ordering { - let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); - if ord != Ordering::Equal { return ord; } - - ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); - if ord != Ordering::Equal { return ord; } - - return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); - } -} - /// A connector's global inbox. Any received message ends up here. This is /// because a message might be received before a branch arrives at the /// corresponding `get()` that is supposed to receive that message. Hence we