use std::collections::HashMap; use std::collections::hash_map::Entry; use std::cmp::Ordering; use super::connector::{PortIdLocal, BranchId}; use crate::PortId; use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; /// A message residing in a connector's inbox (waiting to be put into some kind /// of speculative branch), or a message waiting to be sent. #[derive(Clone)] pub struct BufferedMessage { pub(crate) sending_port: PortId, pub(crate) receiving_port: PortId, pub(crate) peer_prev_branch_id: Option, pub(crate) peer_cur_branch_id: u32, pub(crate) message: ValueGroup, } #[derive(Clone)] pub struct Message { pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } pub struct Inbox { messages: Vec } impl Inbox { pub fn new() -> Self { Self{ messages: Vec::new() } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. pub fn insert_message(&mut self, message: Message) { match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { Ok(_) => {} // message already exists Err(idx) => self.messages.insert(idx, message) } } /// Retrieves all messages for the provided conditions pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { // Seek the first message with the appropriate port ID and branch ID let num_messages = self.messages.len(); for first_idx in 0..num_messages { let msg = &self.messages[first_idx]; if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { // Found a match, seek ahead until the condition is no longer true let mut last_idx = first_idx + 1; while last_idx < num_messages { let msg = &self.messages[last_idx]; if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { // No longer matching break; } last_idx += 1; } // Return all the matching messages return &self.messages[first_idx..last_idx]; } else if msg.receiving_port.id > port_id.id { // Because messages are ordered, this implies we couldn't find // any message break; } } return &self.messages[0..0]; } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); } // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur // branch id. fn compare_messages(a: &Message, b: &Message) -> Ordering { let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); if ord != Ordering::Equal { return ord; } ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); if ord != Ordering::Equal { return ord; } return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); } } /// A connector's global inbox. Any received message ends up here. This is /// because a message might be received before a branch arrives at the /// corresponding `get()` that is supposed to receive that message. Hence we /// need to store it for all future branches that might be able to receive it. pub struct ConnectorInbox { // TODO: @optimize, HashMap + Vec is a bit stupid. messages: HashMap> } /// An action performed on a port. Unsure about this #[derive(PartialEq, Eq, Hash)] struct PortAction { port_id: u32, prev_branch_id: Option, } // TODO: @remove impl ConnectorInbox { pub fn new() -> Self { Self { messages: HashMap::new(), } } /// Inserts a new message into the inbox. pub fn insert_message(&mut self, message: BufferedMessage) { // TODO: @error - Messages are received from actors we generally cannot // trust, and may be unreliable, so messages may be received multiple // times or have spoofed branch IDs. Debug asserts are present for the // initial implementation. // If it is the first message on the port, then we cannot possible have // a previous port mapping on that port. let port_action = PortAction{ port_id: message.receiving_port.0.u32_suffix, prev_branch_id: message.peer_prev_branch_id, }; match self.messages.entry(port_action) { Entry::Occupied(mut entry) => { let entry = entry.get_mut(); debug_assert!( entry.iter() .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id) .is_none(), "inbox already contains sent message (same new branch ID)" ); entry.push(message); }, Entry::Vacant(entry) => { entry.insert(vec![message]); } } } /// Checks if the provided port (and the branch id mapped to that port) /// correspond to any messages in the inbox. pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option) -> Option<&[BufferedMessage]> { let port_action = PortAction{ port_id, prev_branch_id: prev_branch_id_at_port, }; match self.messages.get(&port_action) { Some(messages) => return Some(messages.as_slice()), None => return None, } } pub fn clear(&mut self) { self.messages.clear(); } } /// A connector's outbox. A temporary storage for messages that are sent by /// branches performing `put`s until we're done running all branches and can /// actually transmit the messages. pub struct ConnectorOutbox { messages: Vec, } impl ConnectorOutbox { pub fn new() -> Self { Self{ messages: Vec::new(), } } pub fn insert_message(&mut self, message: BufferedMessage) { // TODO: @error - Depending on the way we implement the runtime in the // future we might end up not trusting "our own code" (i.e. in case // the connectors we are running are described by foreign code) debug_assert!( self.messages.iter() .find(|v| v.sending_port == message.sending_port && v.peer_prev_branch_id == message.peer_prev_branch_id ) .is_none(), "messages was already registered for sending" ); self.messages.push(message); } pub fn take_next_message_to_send(&mut self) -> Option { self.messages.pop() } pub fn clear(&mut self) { self.messages.clear(); } }