Files
@ daf15df0f8ca
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox.rs
daf15df0f8ca
6.5 KiB
application/rls-services+xml
scaffolding in place for scheduler/runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | /**
inbox.rs
Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:
The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.
The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/
use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::protocol::eval::ValueGroup;
use super::connector::{BranchId, PortIdLocal};
use super::global_store::ConnectorId;
/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutgoingMessage {
pub sending_port: PortIdLocal,
pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
pub sender_cur_branch_id: BranchId, // always valid
pub message: ValueGroup,
}
/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
#[derive(Clone)]
pub struct DataMessage {
pub sending_connector: ConnectorId,
pub sending_port: PortIdLocal,
pub receiving_port: PortIdLocal,
pub sender_prev_branch_id: BranchId,
pub sender_cur_branch_id: BranchId,
pub message: ValueGroup,
}
/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
pub struct ControlMessage {
pub id: u32, // generic identifier, used to match request to response
pub sender: ConnectorId,
pub content: ControlMessageVariant,
}
pub enum ControlMessageVariant {
ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}
/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
/// out and handles all control message and potential routing). The correctly
/// addressed `Data` variants will end up at the connector.
pub enum Message {
Data(DataMessage),
Control(ControlMessage),
}
/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
// Should behave as a MPSC queue.
pub struct PublicInbox {
messages: Mutex<VecDeque<Message>>,
}
impl PublicInbox {
pub fn new() -> Self {
Self{
messages: Mutex::new(VecDeque::new()),
}
}
pub fn insert_message(&self, message: Message) {
let mut lock = self.messages.lock().unwrap();
lock.push_back(message);
}
pub fn take_message(&self) -> Option<Message> {
let mut lock = self.messages.lock().unwrap();
return lock.pop_front();
}
pub fn is_empty(&self) -> bool {
let lock = self.messages.lock().unwrap();
return lock.is_empty();
}
}
pub struct PrivateInbox {
// "Normal" messages, intended for a PDL protocol. These need to stick
// around during an entire sync-block (to handle `put`s for which the
// corresponding `get`s have not yet been reached).
messages: Vec<DataMessage>,
len_read: usize,
}
impl PrivateInbox {
pub fn new() -> Self {
Self{
messages: Vec::new(),
len_read: 0,
}
}
/// Will insert the message into the inbox. Only exception is when the tuple
/// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
/// nothing is inserted..
pub fn insert_message(&mut self, message: DataMessage) {
for existing in self.messages.iter() {
if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
existing.sender_cur_branch_id == message.sender_cur_branch_id &&
existing.receiving_port == message.receiving_port {
// Message was already received
return;
}
}
self.messages.push(message);
}
/// Retrieves all previously read messages that satisfy the provided
/// speculative conditions. Note that the inbox remains read-locked until
/// the returned iterator is dropped. Should only be called by the
/// inbox-reader (i.e. the thread executing a connector's PDL code).
///
/// This function should only be used to check if already-received messages
/// could be received by a newly encountered `get` call in a connector's
/// PDL code.
pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
return InboxMessageIter{
messages: &self.messages,
next_index: 0,
max_index: self.len_read,
match_port_id: port_id,
match_prev_branch_id: prev_branch_id,
};
}
/// Retrieves the next unread message. Should only be called by the
/// inbox-reader.
pub fn next_message(&mut self) -> Option<&DataMessage> {
if self.len_read == self.messages.len() {
return None;
}
let to_return = &self.messages[self.len_read];
self.len_read += 1;
return Some(to_return);
}
/// Simply empties the inbox
pub fn clear(&mut self) {
self.messages.clear();
self.len_read = 0;
}
}
/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
messages: &'i Vec<DataMessage>,
next_index: usize,
max_index: usize,
match_port_id: PortIdLocal,
match_prev_branch_id: BranchId,
}
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
type Item = &'m DataMessage;
fn next(&'m mut self) -> Option<Self::Item> {
// Loop until match is found or at end of messages
while self.next_index < self.max_index {
let cur_message = &self.messages[self.next_index];
if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
// Found a match
break;
}
self.next_index += 1;
}
if self.next_index == self.max_index {
return None;
}
let message = &self.messages[self.next_index];
self.next_index += 1;
return Some(message);
}
}
|