Files
@ 8c5d438b0fa3
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox.rs - annotation
8c5d438b0fa3
3.4 KiB
application/rls-services+xml
rewriting inbox to behave mpsc-like
8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 8c5d438b0fa3 | use crate::common::Ordering;
use crate::protocol::eval::ValueGroup;
use crate::runtime2::connector::{BranchId, PortIdLocal};
/// A message in transit from one connector to another.
#[derive(Clone)]
pub struct Message {
pub sending_port: PortIdLocal,
pub receiving_port: PortIdLocal,
pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
pub sender_cur_branch_id: BranchId, // always valid
pub message: ValueGroup,
}
/// The inbox of a connector. The owning connector (i.e. the thread that is
/// executing the connector) should be able to read all messages. Other
/// connectors (potentially executed by different threads) should be able to
/// append messages.
///
/// Note that the logic inside of the inbox is strongly connected to deciding
/// whether or not a connector has nothing to execute, and is waiting on new
/// messages in order to continue.
pub struct Inbox {
messages: Vec<Message>
}
impl Inbox {
pub fn new() -> Self {
Self{ messages: Vec::new() }
}
/// Will insert the message into the inbox. Only exception is when the tuple
/// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
/// nothing is inserted..
pub fn insert_message(&mut self, message: Message) {
match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
Ok(_) => {} // message already exists
Err(idx) => self.messages.insert(idx, message)
}
}
/// Retrieves all messages for the provided conditions
pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
// Seek the first message with the appropriate port ID and branch ID
let num_messages = self.messages.len();
for first_idx in 0..num_messages {
let msg = &self.messages[first_idx];
if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
// Found a match, seek ahead until the condition is no longer true
let mut last_idx = first_idx + 1;
while last_idx < num_messages {
let msg = &self.messages[last_idx];
if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
// No longer matching
break;
}
last_idx += 1;
}
// Return all the matching messages
return &self.messages[first_idx..last_idx];
} else if msg.receiving_port.id > port_id.id {
// Because messages are ordered, this implies we couldn't find
// any message
break;
}
}
return &self.messages[0..0];
}
/// Simply empties the inbox
pub fn clear(&mut self) {
self.messages.clear();
}
// Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
// branch id.
fn compare_messages(a: &Message, b: &Message) -> Ordering {
let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
if ord != Ordering::Equal { return ord; }
ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
if ord != Ordering::Equal { return ord; }
return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
}
}
|