diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index d2d89a42df942e5275d196c058fc5b80cc2ddc00..2d64dd23404f8dea01d3441b5adc99902eaba789 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,23 +1,40 @@ +/** +inbox.rs + +Contains various types of inboxes and message types for the connectors. There +are two kinds of inboxes: + +The `PublicInbox` is a simple message queue. Messages are put in by various +threads, and they're taken out by a single thread. These messages may contain +control messages and may be filtered or redirected by the scheduler. + +The `PrivateInbox` is a temporary storage for all messages that are received +within a certain sync-round. +**/ + use std::collections::VecDeque; use std::sync::{RwLock, RwLockReadGuard, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::protocol::eval::ValueGroup; -use crate::runtime2::connector::{BranchId, PortIdLocal}; +use super::connector::{BranchId, PortIdLocal}; +use super::global_store::ConnectorId; /// A message prepared by a connector. Waiting to be picked up by the runtime to /// be sent to another connector. #[derive(Clone)] -pub struct OutboxMessage { +pub struct OutgoingMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } -/// A message inserted into the inbox of a connector by the runtime. +/// A message that has been delivered (after being imbued with the receiving +/// port by the scheduler) to a connector. #[derive(Clone)] -pub struct InboxMessage { +pub struct DataMessage { + pub sending_connector: ConnectorId, pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, @@ -25,48 +42,80 @@ pub struct InboxMessage { pub message: ValueGroup, } -/// A message sent between connectors to communicate something about their -/// scheduling state. -pub enum ControlMessage { - ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID) - Ack(u32), // (control message ID) +/// A control message. These might be sent by the scheduler to notify eachother +/// of asynchronous state changes. +pub struct ControlMessage { + pub id: u32, // generic identifier, used to match request to response + pub sender: ConnectorId, + pub content: ControlMessageVariant, +} + +pub enum ControlMessageVariant { + ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port + Ack, // acknowledgement of previous control message, matching occurs through control message ID. +} + +/// Generic message in the `PublicInbox`, handled by the scheduler (which takes +/// out and handles all control message and potential routing). The correctly +/// addressed `Data` variants will end up at the connector. +pub enum Message { + Data(DataMessage), + Control(ControlMessage), +} + +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, } -/// The inbox of a connector. The owning connector (i.e. the thread that is -/// executing the connector) should be able to read all messages. Other -/// connectors (potentially executed by different threads) should be able to -/// append messages. -/// -/// If a connector has no more code to run, and its inbox does not contain any -/// new messages, then it may go into sleep mode. -/// -// TODO: @Optimize, this is a temporary lazy implementation -pub struct Inbox { +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub fn insert_message(&self, message: Message) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } +} + +pub struct PrivateInbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). - messages: RwLock>, - len_read: AtomicUsize, - // System messages. These are handled by the scheduler and only need to be - // handled once. - system_messages: Mutex>, + messages: Vec, + len_read: usize, } -impl Inbox { +impl PrivateInbox { pub fn new() -> Self { Self{ - messages: RwLock::new(Vec::new()), - len_read: AtomicUsize::new(0), - system_messages: Mutex::new(VecDeque::new()), + messages: Vec::new(), + len_read: 0, } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. - pub fn insert_message(&self, message: InboxMessage) { - let mut messages = self.messages.write().unwrap(); - for existing in messages.iter() { + pub fn insert_message(&mut self, message: DataMessage) { + for existing in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && existing.receiving_port == message.receiving_port { @@ -74,7 +123,8 @@ impl Inbox { return; } } - messages.push(message); + + self.messages.push(message); } /// Retrieves all previously read messages that satisfy the provided @@ -86,11 +136,10 @@ impl Inbox { /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { - let lock = self.messages.read().unwrap(); return InboxMessageIter{ - lock, + messages: &self.messages, next_index: 0, - max_index: self.len_read.load(Ordering::Acquire), + max_index: self.len_read, match_port_id: port_id, match_prev_branch_id: prev_branch_id, }; @@ -98,58 +147,26 @@ impl Inbox { /// Retrieves the next unread message. Should only be called by the /// inbox-reader. - pub fn next_message(&self) -> Option { - let lock = self.messages.read().unwrap(); - let cur_index = self.len_read.load(Ordering::Acquire); - if cur_index >= lock.len() { + pub fn next_message(&mut self) -> Option<&DataMessage> { + if self.len_read == self.messages.len() { return None; } - // TODO: Accept the correctness and simply make it an add, or even - // remove the atomic altogether. - if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) { - panic!("multiple readers modifying number of messages read"); - } - - return Some(InboxMessageRef{ - lock, - index: cur_index, - }); + let to_return = &self.messages[self.len_read]; + self.len_read += 1; + return Some(to_return); } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); - } - - pub fn insert_control_message(&self, message: ControlMessage) { - let mut lock = self.system_messages.lock().unwrap(); - lock.push_back(message); - } - - pub fn take_control_message(&self) -> Option { - let mut lock = self.system_messages.lock().unwrap(); - return lock.pop_front(); - } -} - -/// Reference to a new message -pub struct InboxMessageRef<'i> { - lock: RwLockReadGuard<'i, Vec>, - index: usize, -} - -impl<'i> std::ops::Deref for InboxMessageRef<'i> { - type Target = InboxMessage; - - fn deref(&self) -> &'i Self::Target { - return &self.lock[self.index]; + self.len_read = 0; } } /// Iterator over previously received messages in the inbox. pub struct InboxMessageIter<'i> { - lock: RwLockReadGuard<'i, Vec>, + messages: &'i Vec, next_index: usize, max_index: usize, match_port_id: PortIdLocal, @@ -157,12 +174,12 @@ pub struct InboxMessageIter<'i> { } impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { - type Item = &'m InboxMessage; + type Item = &'m DataMessage; fn next(&'m mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { - let cur_message = &self.lock[self.next_index]; + let cur_message = &self.messages[self.next_index]; if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; @@ -175,7 +192,7 @@ impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { return None; } - let message = &self.lock[self.next_index]; + let message = &self.messages[self.next_index]; self.next_index += 1; return Some(message); }