diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index a26cf17373de3aab4a0c652c59288d1e4934cab1..808fe06a90cb43fb4205329a7104e634a10e41b5 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,211 +1,137 @@ -/** -inbox.rs - -Contains various types of inboxes and message types for the connectors. There -are two kinds of inboxes: - -The `PublicInbox` is a simple message queue. Messages are put in by various -threads, and they're taken out by a single thread. These messages may contain -control messages and may be filtered or redirected by the scheduler. - -The `PrivateInbox` is a temporary storage for all messages that are received -within a certain sync-round. -**/ - -use std::collections::VecDeque; use std::sync::Mutex; +use std::collections::VecDeque; -use super::ConnectorId; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox2::MessageFancy; -use super::connector::BranchId; + +use super::ConnectorId; +use super::branch::BranchId; +use super::consensus::{GlobalSolution, LocalSolution}; use super::port::PortIdLocal; -/// A message that has been delivered (after being imbued with the receiving -/// port by the scheduler) to a connector. -// TODO: Remove Debug on messages -#[derive(Debug, Clone)] -pub struct DataMessage { - pub sending_port: PortIdLocal, - pub sender_prev_branch_id: BranchId, - pub sender_cur_branch_id: BranchId, - pub message: ValueGroup, -} +// TODO: Remove Debug derive from all types -#[derive(Debug, Clone)] -pub enum SyncBranchConstraint { - SilentPort(PortIdLocal), - BranchNumber(BranchId), - PortMapping(PortIdLocal, BranchId), +#[derive(Debug, Copy, Clone)] +pub(crate) struct PortAnnotation { + pub port_id: PortIdLocal, + pub registered_id: Option, + pub expected_firing: Option, } +/// The header added by the synchronization algorithm to all. #[derive(Debug, Clone)] -pub struct SyncConnectorSolution { - pub connector_id: ConnectorId, - pub terminating_branch_id: BranchId, - pub execution_branch_ids: Vec, // no particular ordering of IDs enforced - pub final_port_mapping: Vec<(PortIdLocal, BranchId)> +pub(crate) struct SyncHeader { + pub sending_component_id: ConnectorId, + pub highest_component_id: ConnectorId, } +/// The header added to data messages #[derive(Debug, Clone)] -pub struct SyncConnectorConstraints { - pub connector_id: ConnectorId, - pub constraints: Vec, +pub(crate) struct DataHeader { + pub expected_mapping: Vec, + pub sending_port: PortIdLocal, + pub target_port: PortIdLocal, + pub new_mapping: BranchId, } +// TODO: Very much on the fence about this. On one hand I thought making it a +// data message was neat because "silent port notification" should be rerouted +// like any other data message to determine the component ID of the receiver +// and to make it part of the leader election algorithm for the sync leader. +// However: it complicates logic quite a bit. Really it might be easier to +// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... #[derive(Debug, Clone)] -pub struct SyncMessage { - pub local_solutions: Vec, - pub constraints: Vec, - pub to_visit: Vec, +pub(crate) enum DataContent { + SilentPortNotification, + Message(ValueGroup), } -// TODO: Shouldn't really be here, right? -impl SyncMessage { - /// Creates a new sync message. Assumes that it is created by a connector - /// that has just encountered a new local solution. - pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { - let mut local_solutions = Vec::with_capacity(approximate_peers); - local_solutions.push(initial_solution); - - return Self{ - local_solutions, - constraints: Vec::with_capacity(approximate_peers), - to_visit: Vec::with_capacity(approximate_peers), - }; - } - - /// Checks if a connector has already provided a local solution - pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { - return self.local_solutions - .iter() - .any(|v| v.connector_id == connector_id); - } - - /// Adds a new constraint. If the connector has already provided a local - /// solution then the constraint will be checked. Otherwise the constraint - /// will be added to the solution. If this is the first constraint for a - /// connector then it will be added to the connectors that still have to be - /// visited. - /// - /// If this returns true then the constraint was added, or the local - /// solution for the specified connector satisfies the constraint. If this - /// function returns an error then we're dealing with a nefarious peer. - pub(crate) fn add_or_check_constraint( - &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint - ) -> Result { - if self.has_local_solution_for(connector_id) { - return self.check_constraint(connector_id, constraint); - } else { - self.add_constraint(connector_id, constraint); - return Ok(true); +impl DataContent { + pub(crate) fn as_message(&self) -> Option<&ValueGroup> { + match self { + DataContent::SilentPortNotification => None, + DataContent::Message(message) => Some(message), } } +} - /// Pushes a new connector constraint. Caller must ensure that the solution - /// has not yet arrived at the specified connector (because then it would no - /// longer have constraints, but a proposed solution instead). - pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { - debug_assert!(!self.has_local_solution_for(connector_id)); - - let position = self.constraints - .iter() - .position(|v| v.connector_id == connector_id); - - match position { - Some(index) => { - // Has pre-existing constraints - debug_assert!(self.to_visit.contains(&connector_id)); - let entry = &mut self.constraints[index]; - entry.constraints.push(constraint); - }, - None => { - debug_assert!(!self.to_visit.contains(&connector_id)); - self.constraints.push(SyncConnectorConstraints{ - connector_id, - constraints: vec![constraint], - }); - self.to_visit.push(connector_id); - } - } - } +/// A data message is a message that is intended for the receiver's PDL code, +/// but will also be handled by the consensus algorithm +#[derive(Debug, Clone)] +pub(crate) struct DataMessage { + pub sync_header: SyncHeader, + pub data_header: DataHeader, + pub content: DataContent, +} - /// Checks if a constraint is satisfied by a solution. Caller must make sure - /// that a local solution has already been provided. Will return an error - /// value only if the provided constraint does not make sense (i.e. a - /// nefarious peer has supplied a constraint with a port we do not own). - pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { - debug_assert!(self.has_local_solution_for(connector_id)); - - let entry = self.local_solutions - .iter() - .find(|v| v.connector_id == connector_id) - .unwrap(); - - match constraint { - SyncBranchConstraint::SilentPort(silent_port_id) => { - for (port_id, mapped_id) in &entry.final_port_mapping { - if *port_id == silent_port_id { - // If silent, then mapped ID is invalid - return Ok(!mapped_id.is_valid()) - } - } - - return Err(()); - }, - SyncBranchConstraint::BranchNumber(expected_branch_id) => { - return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); - }, - SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { - for (port_id, mapped_id) in &entry.final_port_mapping { - if port_id == port_id { - return Ok(*mapped_id == expected_branch_id); - } - } - - return Err(()); - }, - } - } +#[derive(Debug)] +pub(crate) enum SyncContent { + LocalSolution(LocalSolution), // sending a local solution to the leader + GlobalSolution(GlobalSolution), // broadcasting to everyone + Notification, // just a notification (so purpose of message is to send the SyncHeader) } -#[derive(Debug, Clone)] -pub struct SolutionMessage { - pub comparison_number: u64, - pub connector_origin: ConnectorId, - pub local_solutions: Vec<(ConnectorId, BranchId)>, - pub to_visit: Vec, +/// A sync message is a message that is intended only for the consensus +/// algorithm. +#[derive(Debug)] +pub(crate) struct SyncMessage { + pub sync_header: SyncHeader, + pub target_component_id: ConnectorId, + pub content: SyncContent, } -/// A control message. These might be sent by the scheduler to notify eachother -/// of asynchronous state changes. -#[derive(Debug, Clone)] -pub struct ControlMessage { +/// A control message is a message intended for the scheduler that is executing +/// a component. +#[derive(Debug)] +pub(crate) struct ControlMessage { pub id: u32, // generic identifier, used to match request to response - pub content: ControlMessageVariant, + pub sending_component_id: ConnectorId, + pub content: ControlContent, } -#[derive(Debug, Clone)] -pub enum ControlMessageVariant { - ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port - CloseChannel(PortIdLocal), // close the port associated with this - Ack, // acknowledgement of previous control message, matching occurs through control message ID. +#[derive(Debug)] +pub(crate) enum ControlContent { + PortPeerChanged(PortIdLocal, ConnectorId), + CloseChannel(PortIdLocal), + Ack, + Ping, } -/// Generic message contents. -#[derive(Debug, Clone)] -pub enum MessageContents { - Data(DataMessage), // data message, handled by connector - Sync(SyncMessage), // sync message, handled by both connector/scheduler - RequestCommit(SolutionMessage), // solution message, requesting participants to commit - ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to - Control(ControlMessage), // control message, handled by scheduler - Ping, // ping message, intentionally waking up a connector (used for native connectors) +/// Combination of data message and control messages. +#[derive(Debug)] +pub(crate) enum Message { + Data(DataMessage), + Sync(SyncMessage), + Control(ControlMessage), } -#[derive(Debug)] -pub struct Message { - pub sending_connector: ConnectorId, - pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) - pub contents: MessageContents, +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, +} + +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub(crate) fn insert_message(&self, message: Message) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub(crate) fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } } \ No newline at end of file