/** inbox.rs Contains various types of inboxes and message types for the connectors. There are two kinds of inboxes: The `PublicInbox` is a simple message queue. Messages are put in by various threads, and they're taken out by a single thread. These messages may contain control messages and may be filtered or redirected by the scheduler. The `PrivateInbox` is a temporary storage for all messages that are received within a certain sync-round. **/ use std::collections::VecDeque; use std::sync::Mutex; use crate::protocol::eval::ValueGroup; use super::connector::BranchId; use super::port::PortIdLocal; use super::global_store::ConnectorId; /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. #[derive(Clone)] pub struct DataMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, pub sender_cur_branch_id: BranchId, pub message: ValueGroup, } #[derive(Clone)] pub enum SyncBranchConstraint { SilentPort(PortIdLocal), BranchNumber(BranchId), PortMapping(PortIdLocal, BranchId), } #[derive(Clone)] pub struct SyncConnectorSolution { pub connector_id: ConnectorId, pub terminating_branch_id: BranchId, pub execution_branch_ids: Vec, // no particular ordering of IDs enforced pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } #[derive(Clone)] pub struct SyncConnectorConstraints { pub connector_id: ConnectorId, pub constraints: Vec, } #[derive(Clone)] pub struct SyncMessage { pub local_solutions: Vec, pub constraints: Vec, pub to_visit: Vec, } // TODO: Shouldn't really be here, right? impl SyncMessage { /// Creates a new sync message. Assumes that it is created by a connector /// that has just encountered a new local solution. pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { let mut local_solutions = Vec::with_capacity(approximate_peers); local_solutions.push(initial_solution); return Self{ local_solutions, constraints: Vec::with_capacity(approximate_peers), to_visit: Vec::with_capacity(approximate_peers), }; } /// Checks if a connector has already provided a local solution pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { return self.local_solutions .iter() .any(|v| v.connector_id == connector_id); } /// Adds a new constraint. If the connector has already provided a local /// solution then the constraint will be checked. Otherwise the constraint /// will be added to the solution. If this is the first constraint for a /// connector then it will be added to the connectors that still have to be /// visited. /// /// If this returns true then the constraint was added, or the local /// solution for the specified connector satisfies the constraint. If this /// function returns an error then we're dealing with a nefarious peer. pub(crate) fn add_or_check_constraint( &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint ) -> Result { if self.has_local_solution_for(connector_id) { return self.check_constraint(connector_id, constraint); } else { self.add_constraint(connector_id, constraint); return Ok(true); } } /// Pushes a new connector constraint. Caller must ensure that the solution /// has not yet arrived at the specified connector (because then it would no /// longer have constraints, but a proposed solution instead). pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { debug_assert!(!self.has_local_solution_for(connector_id)); let position = self.constraints .iter() .position(|v| v.connector_id == connector_id); match position { Some(index) => { // Has pre-existing constraints debug_assert!(self.to_visit.contains(&connector_id)); let entry = &mut self.constraints[index]; entry.constraints.push(constraint); }, None => { debug_assert!(!self.to_visit.contains(&connector_id)); self.constraints.push(SyncConnectorConstraints{ connector_id, constraints: vec![constraint], }); self.to_visit.push(connector_id); } } } /// Checks if a constraint is satisfied by a solution. Caller must make sure /// that a local solution has already been provided. Will return an error /// value only if the provided constraint does not make sense (i.e. a /// nefarious peer has supplied a constraint with a port we do not own). pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { debug_assert!(self.has_local_solution_for(connector_id)); let entry = self.local_solutions .iter() .find(|v| v.connector_id == connector_id) .unwrap(); match constraint { SyncBranchConstraint::SilentPort(silent_port_id) => { for (port_id, mapped_id) in &entry.final_port_mapping { if *port_id == silent_port_id { // If silent, then mapped ID is invalid return Ok(!mapped_id.is_valid()) } } return Err(()); }, SyncBranchConstraint::BranchNumber(expected_branch_id) => { return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); }, SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { for (port_id, mapped_id) in &entry.final_port_mapping { if port_id == port_id { return Ok(*mapped_id == expected_branch_id); } } return Err(()); }, } } } #[derive(Clone)] pub struct SolutionMessage { pub comparison_number: u64, pub connector_origin: ConnectorId, pub local_solutions: Vec<(ConnectorId, BranchId)>, pub to_visit: Vec, } /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. #[derive(Clone)] pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub content: ControlMessageVariant, } #[derive(Clone)] pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port Ack, // acknowledgement of previous control message, matching occurs through control message ID. } /// Generic message contents. #[derive(Clone)] pub enum MessageContents { Data(DataMessage), // data message, handled by connector Sync(SyncMessage), // sync message, handled by both connector/scheduler RequestCommit(SolutionMessage), // solution message, requesting participants to commit ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to Control(ControlMessage), // control message, handled by scheduler Ping, // ping message, intentionally waking up a connector (used for native connectors) } pub struct Message { pub sending_connector: ConnectorId, pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) pub contents: MessageContents, } /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { messages: Mutex>, } impl PublicInbox { pub fn new() -> Self { Self{ messages: Mutex::new(VecDeque::new()), } } pub fn insert_message(&self, message: Message) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } pub fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } pub fn is_empty(&self) -> bool { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } } pub struct PrivateInbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). messages: Vec, len_read: usize, } impl PrivateInbox { pub fn new() -> Self { Self{ messages: Vec::new(), len_read: 0, } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. pub fn insert_message(&mut self, message: DataMessage) { for existing in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && existing.sending_port == message.sending_port { // Message was already received return; } } self.messages.push(message); } /// Retrieves all previously read messages that satisfy the provided /// speculative conditions. Note that the inbox remains read-locked until /// the returned iterator is dropped. Should only be called by the /// inbox-reader (i.e. the thread executing a connector's PDL code). /// /// This function should only be used to check if already-received messages /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { return InboxMessageIter{ messages: &self.messages, next_index: 0, max_index: self.len_read, match_port_id: port_id, match_prev_branch_id: prev_branch_id, }; } /// Retrieves the next unread message. Should only be called by the /// inbox-reader. pub fn next_message(&mut self) -> Option<&DataMessage> { if self.len_read == self.messages.len() { return None; } let to_return = &self.messages[self.len_read]; self.len_read += 1; return Some(to_return); } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); self.len_read = 0; } } /// Iterator over previously received messages in the inbox. pub struct InboxMessageIter<'i> { messages: &'i Vec, next_index: usize, max_index: usize, match_port_id: PortIdLocal, match_prev_branch_id: BranchId, } impl<'i> Iterator for InboxMessageIter<'i> { type Item = &'i DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let cur_message = &self.messages[self.next_index]; if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; } self.next_index += 1; } if self.next_index == self.max_index { return None; } let message = &self.messages[self.next_index]; self.next_index += 1; return Some(message); } }