use std::sync::Mutex; use std::collections::VecDeque; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::BranchId; use crate::runtime2::ConnectorId; use crate::runtime2::consensus::{GlobalSolution, LocalSolution}; use crate::runtime2::port::PortIdLocal; // TODO: Remove Debug derive from all types #[derive(Debug, Copy, Clone)] pub(crate) struct PortAnnotation { pub port_id: PortIdLocal, pub registered_id: Option, pub expected_firing: Option, } /// The header added by the synchronization algorithm to all. #[derive(Debug, Clone)] pub(crate) struct SyncHeader { pub sending_component_id: ConnectorId, pub highest_component_id: ConnectorId, } /// The header added to data messages #[derive(Debug, Clone)] pub(crate) struct DataHeader { pub expected_mapping: Vec, pub sending_port: PortIdLocal, pub target_port: PortIdLocal, pub new_mapping: BranchId, } // TODO: Very much on the fence about this. On one hand I thought making it a // data message was neat because "silent port notification" should be rerouted // like any other data message to determine the component ID of the receiver // and to make it part of the leader election algorithm for the sync leader. // However: it complicates logic quite a bit. Really it might be easier to // create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... #[derive(Debug, Clone)] pub(crate) enum DataContent { SilentPortNotification, Message(ValueGroup), } impl DataContent { pub(crate) fn as_message(&self) -> Option<&ValueGroup> { match self { DataContent::SilentPortNotification => None, DataContent::Message(message) => Some(message), } } } /// A data message is a message that is intended for the receiver's PDL code, /// but will also be handled by the consensus algorithm #[derive(Debug, Clone)] pub(crate) struct DataMessageFancy { pub sync_header: SyncHeader, pub data_header: DataHeader, pub content: DataContent, } #[derive(Debug)] pub(crate) enum SyncContent { LocalSolution(LocalSolution), // sending a local solution to the leader GlobalSolution(GlobalSolution), // broadcasting to everyone Notification, // just a notification (so purpose of message is to send the SyncHeader) } /// A sync message is a message that is intended only for the consensus /// algorithm. #[derive(Debug)] pub(crate) struct SyncMessageFancy { pub sync_header: SyncHeader, pub target_component_id: ConnectorId, pub content: SyncContent, } /// A control message is a message intended for the scheduler that is executing /// a component. #[derive(Debug)] pub(crate) struct ControlMessageFancy { pub id: u32, // generic identifier, used to match request to response pub sending_component_id: ConnectorId, pub content: ControlContent, } #[derive(Debug)] pub(crate) enum ControlContent { PortPeerChanged(PortIdLocal, ConnectorId), CloseChannel(PortIdLocal), Ack, Ping, } /// Combination of data message and control messages. #[derive(Debug)] pub(crate) enum MessageFancy { Data(DataMessageFancy), Sync(SyncMessageFancy), Control(ControlMessageFancy), } /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { messages: Mutex>, } impl PublicInbox { pub fn new() -> Self { Self{ messages: Mutex::new(VecDeque::new()), } } pub(crate) fn insert_message(&self, message: MessageFancy) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } pub(crate) fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } pub fn is_empty(&self) -> bool { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } }