/** inbox.rs Contains various types of inboxes and message types for the connectors. There are two kinds of inboxes: The `PublicInbox` is a simple message queue. Messages are put in by various threads, and they're taken out by a single thread. These messages may contain control messages and may be filtered or redirected by the scheduler. The `PrivateInbox` is a temporary storage for all messages that are received within a certain sync-round. **/ use std::collections::VecDeque; use std::sync::{RwLock, RwLockReadGuard, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::protocol::eval::ValueGroup; use super::connector::{BranchId, PortIdLocal}; use super::global_store::ConnectorId; /// A message prepared by a connector. Waiting to be picked up by the runtime to /// be sent to another connector. #[derive(Clone)] pub struct OutgoingMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. #[derive(Clone)] pub struct DataMessage { pub sending_connector: ConnectorId, pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, pub sender_cur_branch_id: BranchId, pub message: ValueGroup, } /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub sender: ConnectorId, pub content: ControlMessageVariant, } pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port Ack, // acknowledgement of previous control message, matching occurs through control message ID. } /// Generic message in the `PublicInbox`, handled by the scheduler (which takes /// out and handles all control message and potential routing). The correctly /// addressed `Data` variants will end up at the connector. pub enum Message { Data(DataMessage), Control(ControlMessage), } /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { messages: Mutex>, } impl PublicInbox { pub fn new() -> Self { Self{ messages: Mutex::new(VecDeque::new()), } } pub fn insert_message(&self, message: Message) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } pub fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } pub fn is_empty(&self) -> bool { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } } pub struct PrivateInbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). messages: Vec, len_read: usize, } impl PrivateInbox { pub fn new() -> Self { Self{ messages: Vec::new(), len_read: 0, } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. pub fn insert_message(&mut self, message: DataMessage) { for existing in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && existing.receiving_port == message.receiving_port { // Message was already received return; } } self.messages.push(message); } /// Retrieves all previously read messages that satisfy the provided /// speculative conditions. Note that the inbox remains read-locked until /// the returned iterator is dropped. Should only be called by the /// inbox-reader (i.e. the thread executing a connector's PDL code). /// /// This function should only be used to check if already-received messages /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { return InboxMessageIter{ messages: &self.messages, next_index: 0, max_index: self.len_read, match_port_id: port_id, match_prev_branch_id: prev_branch_id, }; } /// Retrieves the next unread message. Should only be called by the /// inbox-reader. pub fn next_message(&mut self) -> Option<&DataMessage> { if self.len_read == self.messages.len() { return None; } let to_return = &self.messages[self.len_read]; self.len_read += 1; return Some(to_return); } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); self.len_read = 0; } } /// Iterator over previously received messages in the inbox. pub struct InboxMessageIter<'i> { messages: &'i Vec, next_index: usize, max_index: usize, match_port_id: PortIdLocal, match_prev_branch_id: BranchId, } impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { type Item = &'m DataMessage; fn next(&'m mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let cur_message = &self.messages[self.next_index]; if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; } self.next_index += 1; } if self.next_index == self.max_index { return None; } let message = &self.messages[self.next_index]; self.next_index += 1; return Some(message); } }