Files
@ cf26538b25dc
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox.rs
cf26538b25dc
6.3 KiB
application/rls-services+xml
architecture for send/recv ports in place
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::protocol::eval::ValueGroup;
use crate::runtime2::connector::{BranchId, PortIdLocal};
/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutboxMessage {
pub sending_port: PortIdLocal,
pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
pub sender_cur_branch_id: BranchId, // always valid
pub message: ValueGroup,
}
/// A message inserted into the inbox of a connector by the runtime.
#[derive(Clone)]
pub struct InboxMessage {
pub sending_port: PortIdLocal,
pub receiving_port: PortIdLocal,
pub sender_prev_branch_id: BranchId,
pub sender_cur_branch_id: BranchId,
pub message: ValueGroup,
}
/// A message sent between connectors to communicate something about their
/// scheduling state.
pub enum ControlMessage {
ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
Ack(u32), // (control message ID)
}
/// The inbox of a connector. The owning connector (i.e. the thread that is
/// executing the connector) should be able to read all messages. Other
/// connectors (potentially executed by different threads) should be able to
/// append messages.
///
/// If a connector has no more code to run, and its inbox does not contain any
/// new messages, then it may go into sleep mode.
///
// TODO: @Optimize, this is a temporary lazy implementation
pub struct Inbox {
// "Normal" messages, intended for a PDL protocol. These need to stick
// around during an entire sync-block (to handle `put`s for which the
// corresponding `get`s have not yet been reached).
messages: RwLock<Vec<InboxMessage>>,
len_read: AtomicUsize,
// System messages. These are handled by the scheduler and only need to be
// handled once.
system_messages: Mutex<VecDeque<ControlMessage>>,
}
impl Inbox {
pub fn new() -> Self {
Self{
messages: RwLock::new(Vec::new()),
len_read: AtomicUsize::new(0),
system_messages: Mutex::new(VecDeque::new()),
}
}
/// Will insert the message into the inbox. Only exception is when the tuple
/// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
/// nothing is inserted..
pub fn insert_message(&self, message: InboxMessage) {
let mut messages = self.messages.write().unwrap();
for existing in messages.iter() {
if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
existing.sender_cur_branch_id == message.sender_cur_branch_id &&
existing.receiving_port == message.receiving_port {
// Message was already received
return;
}
}
messages.push(message);
}
/// Retrieves all previously read messages that satisfy the provided
/// speculative conditions. Note that the inbox remains read-locked until
/// the returned iterator is dropped. Should only be called by the
/// inbox-reader (i.e. the thread executing a connector's PDL code).
///
/// This function should only be used to check if already-received messages
/// could be received by a newly encountered `get` call in a connector's
/// PDL code.
pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
let lock = self.messages.read().unwrap();
return InboxMessageIter{
lock,
next_index: 0,
max_index: self.len_read.load(Ordering::Acquire),
match_port_id: port_id,
match_prev_branch_id: prev_branch_id,
};
}
/// Retrieves the next unread message. Should only be called by the
/// inbox-reader.
pub fn next_message(&self) -> Option<InboxMessageRef> {
let lock = self.messages.read().unwrap();
let cur_index = self.len_read.load(Ordering::Acquire);
if cur_index >= lock.len() {
return None;
}
// TODO: Accept the correctness and simply make it an add, or even
// remove the atomic altogether.
if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
panic!("multiple readers modifying number of messages read");
}
return Some(InboxMessageRef{
lock,
index: cur_index,
});
}
/// Simply empties the inbox
pub fn clear(&mut self) {
self.messages.clear();
}
pub fn insert_control_message(&self, message: ControlMessage) {
let mut lock = self.system_messages.lock().unwrap();
lock.push_back(message);
}
pub fn take_control_message(&self) -> Option<ControlMessage> {
let mut lock = self.system_messages.lock().unwrap();
return lock.pop_front();
}
}
/// Reference to a new message
pub struct InboxMessageRef<'i> {
lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
index: usize,
}
impl<'i> std::ops::Deref for InboxMessageRef<'i> {
type Target = InboxMessage;
fn deref(&self) -> &'i Self::Target {
return &self.lock[self.index];
}
}
/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
next_index: usize,
max_index: usize,
match_port_id: PortIdLocal,
match_prev_branch_id: BranchId,
}
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
type Item = &'m InboxMessage;
fn next(&'m mut self) -> Option<Self::Item> {
// Loop until match is found or at end of messages
while self.next_index < self.max_index {
let cur_message = &self.lock[self.next_index];
if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
// Found a match
break;
}
self.next_index += 1;
}
if self.next_index == self.max_index {
return None;
}
let message = &self.lock[self.next_index];
self.next_index += 1;
return Some(message);
}
}
|