Files
@ 98aadfccbafd
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox.rs
98aadfccbafd
12.2 KiB
application/rls-services+xml
solving problem of connectors shutting down
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 | /**
inbox.rs
Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:
The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.
The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/
use std::collections::VecDeque;
use std::sync::Mutex;
use super::ConnectorId;
use crate::protocol::eval::ValueGroup;
use super::connector::BranchId;
use super::port::PortIdLocal;
/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
// TODO: Remove Debug on messages
#[derive(Debug, Clone)]
pub struct DataMessage {
pub sending_port: PortIdLocal,
pub sender_prev_branch_id: BranchId,
pub sender_cur_branch_id: BranchId,
pub message: ValueGroup,
}
#[derive(Debug, Clone)]
pub enum SyncBranchConstraint {
SilentPort(PortIdLocal),
BranchNumber(BranchId),
PortMapping(PortIdLocal, BranchId),
}
#[derive(Debug, Clone)]
pub struct SyncConnectorSolution {
pub connector_id: ConnectorId,
pub terminating_branch_id: BranchId,
pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
}
#[derive(Debug, Clone)]
pub struct SyncConnectorConstraints {
pub connector_id: ConnectorId,
pub constraints: Vec<SyncBranchConstraint>,
}
#[derive(Debug, Clone)]
pub struct SyncMessage {
pub local_solutions: Vec<SyncConnectorSolution>,
pub constraints: Vec<SyncConnectorConstraints>,
pub to_visit: Vec<ConnectorId>,
}
// TODO: Shouldn't really be here, right?
impl SyncMessage {
/// Creates a new sync message. Assumes that it is created by a connector
/// that has just encountered a new local solution.
pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
let mut local_solutions = Vec::with_capacity(approximate_peers);
local_solutions.push(initial_solution);
return Self{
local_solutions,
constraints: Vec::with_capacity(approximate_peers),
to_visit: Vec::with_capacity(approximate_peers),
};
}
/// Checks if a connector has already provided a local solution
pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
return self.local_solutions
.iter()
.any(|v| v.connector_id == connector_id);
}
/// Adds a new constraint. If the connector has already provided a local
/// solution then the constraint will be checked. Otherwise the constraint
/// will be added to the solution. If this is the first constraint for a
/// connector then it will be added to the connectors that still have to be
/// visited.
///
/// If this returns true then the constraint was added, or the local
/// solution for the specified connector satisfies the constraint. If this
/// function returns an error then we're dealing with a nefarious peer.
pub(crate) fn add_or_check_constraint(
&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
) -> Result<bool, ()> {
if self.has_local_solution_for(connector_id) {
return self.check_constraint(connector_id, constraint);
} else {
self.add_constraint(connector_id, constraint);
return Ok(true);
}
}
/// Pushes a new connector constraint. Caller must ensure that the solution
/// has not yet arrived at the specified connector (because then it would no
/// longer have constraints, but a proposed solution instead).
pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
debug_assert!(!self.has_local_solution_for(connector_id));
let position = self.constraints
.iter()
.position(|v| v.connector_id == connector_id);
match position {
Some(index) => {
// Has pre-existing constraints
debug_assert!(self.to_visit.contains(&connector_id));
let entry = &mut self.constraints[index];
entry.constraints.push(constraint);
},
None => {
debug_assert!(!self.to_visit.contains(&connector_id));
self.constraints.push(SyncConnectorConstraints{
connector_id,
constraints: vec![constraint],
});
self.to_visit.push(connector_id);
}
}
}
/// Checks if a constraint is satisfied by a solution. Caller must make sure
/// that a local solution has already been provided. Will return an error
/// value only if the provided constraint does not make sense (i.e. a
/// nefarious peer has supplied a constraint with a port we do not own).
pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()> {
debug_assert!(self.has_local_solution_for(connector_id));
let entry = self.local_solutions
.iter()
.find(|v| v.connector_id == connector_id)
.unwrap();
match constraint {
SyncBranchConstraint::SilentPort(silent_port_id) => {
for (port_id, mapped_id) in &entry.final_port_mapping {
if *port_id == silent_port_id {
// If silent, then mapped ID is invalid
return Ok(!mapped_id.is_valid())
}
}
return Err(());
},
SyncBranchConstraint::BranchNumber(expected_branch_id) => {
return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
},
SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
for (port_id, mapped_id) in &entry.final_port_mapping {
if port_id == port_id {
return Ok(*mapped_id == expected_branch_id);
}
}
return Err(());
},
}
}
}
#[derive(Debug, Clone)]
pub struct SolutionMessage {
pub comparison_number: u64,
pub connector_origin: ConnectorId,
pub local_solutions: Vec<(ConnectorId, BranchId)>,
pub to_visit: Vec<ConnectorId>,
}
/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
#[derive(Debug, Clone)]
pub struct ControlMessage {
pub id: u32, // generic identifier, used to match request to response
pub content: ControlMessageVariant,
}
#[derive(Debug, Clone)]
pub enum ControlMessageVariant {
ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}
/// Generic message contents.
#[derive(Debug, Clone)]
pub enum MessageContents {
Data(DataMessage), // data message, handled by connector
Sync(SyncMessage), // sync message, handled by both connector/scheduler
RequestCommit(SolutionMessage), // solution message, requesting participants to commit
ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
Control(ControlMessage), // control message, handled by scheduler
Ping, // ping message, intentionally waking up a connector (used for native connectors)
}
#[derive(Debug)]
pub struct Message {
pub sending_connector: ConnectorId,
pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
pub contents: MessageContents,
}
/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
// Should behave as a MPSC queue.
pub struct PublicInbox {
messages: Mutex<VecDeque<Message>>,
}
impl PublicInbox {
pub fn new() -> Self {
Self{
messages: Mutex::new(VecDeque::new()),
}
}
pub fn insert_message(&self, message: Message) {
let mut lock = self.messages.lock().unwrap();
lock.push_back(message);
}
pub fn take_message(&self) -> Option<Message> {
let mut lock = self.messages.lock().unwrap();
return lock.pop_front();
}
pub fn is_empty(&self) -> bool {
let lock = self.messages.lock().unwrap();
return lock.is_empty();
}
}
pub(crate) struct PrivateInbox {
// "Normal" messages, intended for a PDL protocol. These need to stick
// around during an entire sync-block (to handle `put`s for which the
// corresponding `get`s have not yet been reached).
messages: Vec<(PortIdLocal, DataMessage)>,
len_read: usize,
}
impl PrivateInbox {
pub fn new() -> Self {
Self{
messages: Vec::new(),
len_read: 0,
}
}
/// Will insert the message into the inbox. Only exception is when the tuple
/// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
/// nothing is inserted..
pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
for (existing_target_port, existing) in self.messages.iter() {
if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
existing.sender_cur_branch_id == message.sender_cur_branch_id &&
*existing_target_port == target_port {
// Message was already received
return;
}
}
self.messages.push((target_port, message));
}
/// Retrieves all previously read messages that satisfy the provided
/// speculative conditions. Note that the inbox remains read-locked until
/// the returned iterator is dropped. Should only be called by the
/// inbox-reader (i.e. the thread executing a connector's PDL code).
///
/// This function should only be used to check if already-received messages
/// could be received by a newly encountered `get` call in a connector's
/// PDL code.
pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
return InboxMessageIter {
messages: &self.messages,
next_index: 0,
max_index: self.len_read,
match_port_id: port_id,
match_prev_branch_id: prev_branch_id,
};
}
/// Retrieves the next unread message. Should only be called by the
/// inbox-reader.
pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
if self.len_read == self.messages.len() {
return None;
}
let (_, to_return) = &self.messages[self.len_read];
self.len_read += 1;
return Some(to_return);
}
/// Simply empties the inbox
pub(crate) fn clear(&mut self) {
self.messages.clear();
self.len_read = 0;
}
}
/// Iterator over previously received messages in the inbox.
pub(crate) struct InboxMessageIter<'i> {
messages: &'i Vec<(PortIdLocal, DataMessage)>,
next_index: usize,
max_index: usize,
match_port_id: PortIdLocal,
match_prev_branch_id: BranchId,
}
impl<'i> Iterator for InboxMessageIter<'i> {
type Item = &'i DataMessage;
fn next(&mut self) -> Option<Self::Item> {
// Loop until match is found or at end of messages
while self.next_index < self.max_index {
let (target_port, cur_message) = &self.messages[self.next_index];
if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
// Found a match
break;
}
self.next_index += 1;
}
if self.next_index == self.max_index {
return None;
}
let (_, message) = &self.messages[self.next_index];
self.next_index += 1;
return Some(message);
}
}
|