diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 871ec8720c7256d6479bef3eb8c4bd7eb7838ada..d2d89a42df942e5275d196c058fc5b80cc2ddc00 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,73 +1,120 @@ -use crate::common::Ordering; +use std::collections::VecDeque; +use std::sync::{RwLock, RwLockReadGuard, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::protocol::eval::ValueGroup; use crate::runtime2::connector::{BranchId, PortIdLocal}; -/// A message in transit from one connector to another. +/// A message prepared by a connector. Waiting to be picked up by the runtime to +/// be sent to another connector. #[derive(Clone)] -pub struct Message { +pub struct OutboxMessage { pub sending_port: PortIdLocal, - pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } +/// A message inserted into the inbox of a connector by the runtime. +#[derive(Clone)] +pub struct InboxMessage { + pub sending_port: PortIdLocal, + pub receiving_port: PortIdLocal, + pub sender_prev_branch_id: BranchId, + pub sender_cur_branch_id: BranchId, + pub message: ValueGroup, +} + +/// A message sent between connectors to communicate something about their +/// scheduling state. +pub enum ControlMessage { + ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID) + Ack(u32), // (control message ID) +} + /// The inbox of a connector. The owning connector (i.e. the thread that is /// executing the connector) should be able to read all messages. Other /// connectors (potentially executed by different threads) should be able to /// append messages. /// -/// Note that the logic inside of the inbox is strongly connected to deciding -/// whether or not a connector has nothing to execute, and is waiting on new -/// messages in order to continue. +/// If a connector has no more code to run, and its inbox does not contain any +/// new messages, then it may go into sleep mode. +/// +// TODO: @Optimize, this is a temporary lazy implementation pub struct Inbox { - messages: Vec + // "Normal" messages, intended for a PDL protocol. These need to stick + // around during an entire sync-block (to handle `put`s for which the + // corresponding `get`s have not yet been reached). + messages: RwLock>, + len_read: AtomicUsize, + // System messages. These are handled by the scheduler and only need to be + // handled once. + system_messages: Mutex>, } impl Inbox { pub fn new() -> Self { - Self{ messages: Vec::new() } + Self{ + messages: RwLock::new(Vec::new()), + len_read: AtomicUsize::new(0), + system_messages: Mutex::new(VecDeque::new()), + } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. - pub fn insert_message(&mut self, message: Message) { - match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { - Ok(_) => {} // message already exists - Err(idx) => self.messages.insert(idx, message) + pub fn insert_message(&self, message: InboxMessage) { + let mut messages = self.messages.write().unwrap(); + for existing in messages.iter() { + if existing.sender_prev_branch_id == message.sender_prev_branch_id && + existing.sender_cur_branch_id == message.sender_cur_branch_id && + existing.receiving_port == message.receiving_port { + // Message was already received + return; + } } + messages.push(message); } - /// Retrieves all messages for the provided conditions - pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { - // Seek the first message with the appropriate port ID and branch ID - let num_messages = self.messages.len(); - - for first_idx in 0..num_messages { - let msg = &self.messages[first_idx]; - if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { - // Found a match, seek ahead until the condition is no longer true - let mut last_idx = first_idx + 1; - while last_idx < num_messages { - let msg = &self.messages[last_idx]; - if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { - // No longer matching - break; - } - last_idx += 1; - } - - // Return all the matching messages - return &self.messages[first_idx..last_idx]; - } else if msg.receiving_port.id > port_id.id { - // Because messages are ordered, this implies we couldn't find - // any message - break; - } + /// Retrieves all previously read messages that satisfy the provided + /// speculative conditions. Note that the inbox remains read-locked until + /// the returned iterator is dropped. Should only be called by the + /// inbox-reader (i.e. the thread executing a connector's PDL code). + /// + /// This function should only be used to check if already-received messages + /// could be received by a newly encountered `get` call in a connector's + /// PDL code. + pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { + let lock = self.messages.read().unwrap(); + return InboxMessageIter{ + lock, + next_index: 0, + max_index: self.len_read.load(Ordering::Acquire), + match_port_id: port_id, + match_prev_branch_id: prev_branch_id, + }; + } + + /// Retrieves the next unread message. Should only be called by the + /// inbox-reader. + pub fn next_message(&self) -> Option { + let lock = self.messages.read().unwrap(); + let cur_index = self.len_read.load(Ordering::Acquire); + if cur_index >= lock.len() { + return None; + } + + // TODO: Accept the correctness and simply make it an add, or even + // remove the atomic altogether. + if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) { + panic!("multiple readers modifying number of messages read"); } - return &self.messages[0..0]; + return Some(InboxMessageRef{ + lock, + index: cur_index, + }); } /// Simply empties the inbox @@ -75,15 +122,61 @@ impl Inbox { self.messages.clear(); } - // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur - // branch id. - fn compare_messages(a: &Message, b: &Message) -> Ordering { - let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); - if ord != Ordering::Equal { return ord; } + pub fn insert_control_message(&self, message: ControlMessage) { + let mut lock = self.system_messages.lock().unwrap(); + lock.push_back(message); + } - ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); - if ord != Ordering::Equal { return ord; } + pub fn take_control_message(&self) -> Option { + let mut lock = self.system_messages.lock().unwrap(); + return lock.pop_front(); + } +} + +/// Reference to a new message +pub struct InboxMessageRef<'i> { + lock: RwLockReadGuard<'i, Vec>, + index: usize, +} - return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); +impl<'i> std::ops::Deref for InboxMessageRef<'i> { + type Target = InboxMessage; + + fn deref(&self) -> &'i Self::Target { + return &self.lock[self.index]; } } + +/// Iterator over previously received messages in the inbox. +pub struct InboxMessageIter<'i> { + lock: RwLockReadGuard<'i, Vec>, + next_index: usize, + max_index: usize, + match_port_id: PortIdLocal, + match_prev_branch_id: BranchId, +} + +impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { + type Item = &'m InboxMessage; + + fn next(&'m mut self) -> Option { + // Loop until match is found or at end of messages + while self.next_index < self.max_index { + let cur_message = &self.lock[self.next_index]; + if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + break; + } + + self.next_index += 1; + } + + if self.next_index == self.max_index { + return None; + } + + let message = &self.lock[self.next_index]; + self.next_index += 1; + return Some(message); + } +} \ No newline at end of file