diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs index 191b5ebb98e9f84142b405cec1ae18d2c3f88615..05713bbcd5939ef446291eecd6e54aa05c1130dc 100644 --- a/src/runtime2/messages.rs +++ b/src/runtime2/messages.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::cmp::Ordering; +use super::connector::{PortIdLocal, BranchId}; use crate::PortId; use crate::common::Id; use crate::protocol::*; @@ -17,11 +19,81 @@ pub struct BufferedMessage { pub(crate) message: ValueGroup, } -/// An action performed on a port. Unsure about this -#[derive(PartialEq, Eq, Hash)] -struct PortAction { - port_id: u32, - prev_branch_id: Option, +#[derive(Clone)] +pub struct Message { + pub sending_port: PortIdLocal, + pub receiving_port: PortIdLocal, + pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id + pub sender_cur_branch_id: BranchId, // always valid + pub message: ValueGroup, +} + +pub struct Inbox { + messages: Vec +} + +impl Inbox { + pub fn new() -> Self { + Self{ messages: Vec::new() } + } + + /// Will insert the message into the inbox. Only exception is when the tuple + /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then + /// nothing is inserted.. + pub fn insert_message(&mut self, message: Message) { + match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { + Ok(_) => {} // message already exists + Err(idx) => self.messages.insert(idx, message) + } + } + + /// Retrieves all messages for the provided conditions + pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { + // Seek the first message with the appropriate port ID and branch ID + let num_messages = self.messages.len(); + + for first_idx in 0..num_messages { + let msg = &self.messages[first_idx]; + if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { + // Found a match, seek ahead until the condition is no longer true + let mut last_idx = first_idx + 1; + while last_idx < num_messages { + let msg = &self.messages[last_idx]; + if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { + // No longer matching + break; + } + last_idx += 1; + } + + // Return all the matching messages + return &self.messages[first_idx..last_idx]; + } else if msg.receiving_port.id > port_id.id { + // Because messages are ordered, this implies we couldn't find + // any message + break; + } + } + + return &self.messages[0..0]; + } + + /// Simply empties the inbox + pub fn clear(&mut self) { + self.messages.clear(); + } + + // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur + // branch id. + fn compare_messages(a: &Message, b: &Message) -> Ordering { + let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); + if ord != Ordering::Equal { return ord; } + + ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); + if ord != Ordering::Equal { return ord; } + + return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); + } } /// A connector's global inbox. Any received message ends up here. This is @@ -33,6 +105,15 @@ pub struct ConnectorInbox { messages: HashMap> } + +/// An action performed on a port. Unsure about this +#[derive(PartialEq, Eq, Hash)] +struct PortAction { + port_id: u32, + prev_branch_id: Option, +} + +// TODO: @remove impl ConnectorInbox { pub fn new() -> Self { Self {