use std::collections::VecDeque; use std::sync::{RwLock, RwLockReadGuard, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::protocol::eval::ValueGroup; use crate::runtime2::connector::{BranchId, PortIdLocal}; /// A message prepared by a connector. Waiting to be picked up by the runtime to /// be sent to another connector. #[derive(Clone)] pub struct OutboxMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } /// A message inserted into the inbox of a connector by the runtime. #[derive(Clone)] pub struct InboxMessage { pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, pub sender_cur_branch_id: BranchId, pub message: ValueGroup, } /// A message sent between connectors to communicate something about their /// scheduling state. pub enum ControlMessage { ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID) Ack(u32), // (control message ID) } /// The inbox of a connector. The owning connector (i.e. the thread that is /// executing the connector) should be able to read all messages. Other /// connectors (potentially executed by different threads) should be able to /// append messages. /// /// If a connector has no more code to run, and its inbox does not contain any /// new messages, then it may go into sleep mode. /// // TODO: @Optimize, this is a temporary lazy implementation pub struct Inbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). messages: RwLock>, len_read: AtomicUsize, // System messages. These are handled by the scheduler and only need to be // handled once. system_messages: Mutex>, } impl Inbox { pub fn new() -> Self { Self{ messages: RwLock::new(Vec::new()), len_read: AtomicUsize::new(0), system_messages: Mutex::new(VecDeque::new()), } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. pub fn insert_message(&self, message: InboxMessage) { let mut messages = self.messages.write().unwrap(); for existing in messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && existing.receiving_port == message.receiving_port { // Message was already received return; } } messages.push(message); } /// Retrieves all previously read messages that satisfy the provided /// speculative conditions. Note that the inbox remains read-locked until /// the returned iterator is dropped. Should only be called by the /// inbox-reader (i.e. the thread executing a connector's PDL code). /// /// This function should only be used to check if already-received messages /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { let lock = self.messages.read().unwrap(); return InboxMessageIter{ lock, next_index: 0, max_index: self.len_read.load(Ordering::Acquire), match_port_id: port_id, match_prev_branch_id: prev_branch_id, }; } /// Retrieves the next unread message. Should only be called by the /// inbox-reader. pub fn next_message(&self) -> Option { let lock = self.messages.read().unwrap(); let cur_index = self.len_read.load(Ordering::Acquire); if cur_index >= lock.len() { return None; } // TODO: Accept the correctness and simply make it an add, or even // remove the atomic altogether. if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) { panic!("multiple readers modifying number of messages read"); } return Some(InboxMessageRef{ lock, index: cur_index, }); } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); } pub fn insert_control_message(&self, message: ControlMessage) { let mut lock = self.system_messages.lock().unwrap(); lock.push_back(message); } pub fn take_control_message(&self) -> Option { let mut lock = self.system_messages.lock().unwrap(); return lock.pop_front(); } } /// Reference to a new message pub struct InboxMessageRef<'i> { lock: RwLockReadGuard<'i, Vec>, index: usize, } impl<'i> std::ops::Deref for InboxMessageRef<'i> { type Target = InboxMessage; fn deref(&self) -> &'i Self::Target { return &self.lock[self.index]; } } /// Iterator over previously received messages in the inbox. pub struct InboxMessageIter<'i> { lock: RwLockReadGuard<'i, Vec>, next_index: usize, max_index: usize, match_port_id: PortIdLocal, match_prev_branch_id: BranchId, } impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { type Item = &'m InboxMessage; fn next(&'m mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let cur_message = &self.lock[self.next_index]; if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; } self.next_index += 1; } if self.next_index == self.max_index { return None; } let message = &self.lock[self.next_index]; self.next_index += 1; return Some(message); } }