/** inbox.rs Contains various types of inboxes and message types for the connectors. There are two kinds of inboxes: The `PublicInbox` is a simple message queue. Messages are put in by various threads, and they're taken out by a single thread. These messages may contain control messages and may be filtered or redirected by the scheduler. The `PrivateInbox` is a temporary storage for all messages that are received within a certain sync-round. **/ use std::collections::VecDeque; use std::sync::Mutex; use super::ConnectorId; use crate::protocol::eval::ValueGroup; use crate::runtime2::inbox2::MessageFancy; use super::connector::BranchId; use super::port::PortIdLocal; /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. // TODO: Remove Debug on messages #[derive(Debug, Clone)] pub struct DataMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, pub sender_cur_branch_id: BranchId, pub message: ValueGroup, } #[derive(Debug, Clone)] pub enum SyncBranchConstraint { SilentPort(PortIdLocal), BranchNumber(BranchId), PortMapping(PortIdLocal, BranchId), } #[derive(Debug, Clone)] pub struct SyncConnectorSolution { pub connector_id: ConnectorId, pub terminating_branch_id: BranchId, pub execution_branch_ids: Vec, // no particular ordering of IDs enforced pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } #[derive(Debug, Clone)] pub struct SyncConnectorConstraints { pub connector_id: ConnectorId, pub constraints: Vec, } #[derive(Debug, Clone)] pub struct SyncMessage { pub local_solutions: Vec, pub constraints: Vec, pub to_visit: Vec, } // TODO: Shouldn't really be here, right? impl SyncMessage { /// Creates a new sync message. Assumes that it is created by a connector /// that has just encountered a new local solution. pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { let mut local_solutions = Vec::with_capacity(approximate_peers); local_solutions.push(initial_solution); return Self{ local_solutions, constraints: Vec::with_capacity(approximate_peers), to_visit: Vec::with_capacity(approximate_peers), }; } /// Checks if a connector has already provided a local solution pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { return self.local_solutions .iter() .any(|v| v.connector_id == connector_id); } /// Adds a new constraint. If the connector has already provided a local /// solution then the constraint will be checked. Otherwise the constraint /// will be added to the solution. If this is the first constraint for a /// connector then it will be added to the connectors that still have to be /// visited. /// /// If this returns true then the constraint was added, or the local /// solution for the specified connector satisfies the constraint. If this /// function returns an error then we're dealing with a nefarious peer. pub(crate) fn add_or_check_constraint( &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint ) -> Result { if self.has_local_solution_for(connector_id) { return self.check_constraint(connector_id, constraint); } else { self.add_constraint(connector_id, constraint); return Ok(true); } } /// Pushes a new connector constraint. Caller must ensure that the solution /// has not yet arrived at the specified connector (because then it would no /// longer have constraints, but a proposed solution instead). pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { debug_assert!(!self.has_local_solution_for(connector_id)); let position = self.constraints .iter() .position(|v| v.connector_id == connector_id); match position { Some(index) => { // Has pre-existing constraints debug_assert!(self.to_visit.contains(&connector_id)); let entry = &mut self.constraints[index]; entry.constraints.push(constraint); }, None => { debug_assert!(!self.to_visit.contains(&connector_id)); self.constraints.push(SyncConnectorConstraints{ connector_id, constraints: vec![constraint], }); self.to_visit.push(connector_id); } } } /// Checks if a constraint is satisfied by a solution. Caller must make sure /// that a local solution has already been provided. Will return an error /// value only if the provided constraint does not make sense (i.e. a /// nefarious peer has supplied a constraint with a port we do not own). pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { debug_assert!(self.has_local_solution_for(connector_id)); let entry = self.local_solutions .iter() .find(|v| v.connector_id == connector_id) .unwrap(); match constraint { SyncBranchConstraint::SilentPort(silent_port_id) => { for (port_id, mapped_id) in &entry.final_port_mapping { if *port_id == silent_port_id { // If silent, then mapped ID is invalid return Ok(!mapped_id.is_valid()) } } return Err(()); }, SyncBranchConstraint::BranchNumber(expected_branch_id) => { return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); }, SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { for (port_id, mapped_id) in &entry.final_port_mapping { if port_id == port_id { return Ok(*mapped_id == expected_branch_id); } } return Err(()); }, } } } #[derive(Debug, Clone)] pub struct SolutionMessage { pub comparison_number: u64, pub connector_origin: ConnectorId, pub local_solutions: Vec<(ConnectorId, BranchId)>, pub to_visit: Vec, } /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. #[derive(Debug, Clone)] pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub content: ControlMessageVariant, } #[derive(Debug, Clone)] pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port CloseChannel(PortIdLocal), // close the port associated with this Ack, // acknowledgement of previous control message, matching occurs through control message ID. } /// Generic message contents. #[derive(Debug, Clone)] pub enum MessageContents { Data(DataMessage), // data message, handled by connector Sync(SyncMessage), // sync message, handled by both connector/scheduler RequestCommit(SolutionMessage), // solution message, requesting participants to commit ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to Control(ControlMessage), // control message, handled by scheduler Ping, // ping message, intentionally waking up a connector (used for native connectors) } #[derive(Debug)] pub struct Message { pub sending_connector: ConnectorId, pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) pub contents: MessageContents, } /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { messages: Mutex>, } impl PublicInbox { pub fn new() -> Self { Self{ messages: Mutex::new(VecDeque::new()), } } pub fn insert_message(&self, message: MessageFancy) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } pub fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } pub fn is_empty(&self) -> bool { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } }