use std::sync::Mutex; use std::collections::VecDeque; use crate::protocol::eval::ValueGroup; use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner}; use crate::runtime2::port::ChannelId; use super::ConnectorId; use super::consensus::{GlobalSolution, LocalSolution}; use super::port::PortIdLocal; // TODO: Remove Debug derive from all types #[derive(Debug, Copy, Clone)] pub(crate) struct ChannelAnnotation { pub channel_id: ChannelId, pub registered_id: Option, pub expected_firing: Option, } /// Marker for a branch in a port mapping. A marker is, like a branch ID, a /// unique identifier for a branch, but differs in that a branch only has one /// branch ID, but might have multiple associated markers (i.e. one branch /// performing a `put` three times will generate three markers. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct BranchMarker{ marker: u32, } impl BranchMarker { #[inline] pub(crate) fn new(marker: u32) -> Self { debug_assert!(marker != 0); return Self{ marker }; } #[inline] pub(crate) fn new_invalid() -> Self { return Self{ marker: 0 } } } /// The header added by the synchronization algorithm to all. #[derive(Debug, Clone, Copy)] pub(crate) struct SyncHeader { pub sending_component_id: ConnectorId, pub highest_component_id: ConnectorId, pub sync_round: u32, } /// The header added to data messages #[derive(Debug, Clone)] pub(crate) struct DataHeader { pub expected_mapping: Vec, pub sending_port: PortIdLocal, pub target_port: PortIdLocal, pub new_mapping: BranchMarker, } /// A data message is a message that is intended for the receiver's PDL code, /// but will also be handled by the consensus algorithm #[derive(Debug, Clone)] pub(crate) struct DataMessage { pub sync_header: SyncHeader, pub data_header: DataHeader, pub content: ValueGroup, } #[derive(Debug)] pub(crate) enum SyncCompContent { LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever) LocalSolution(LocalSolution), // sending a local solution to the leader PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results GlobalSolution(GlobalSolution), // broadcasting to everyone GlobalFailure, // broadcasting to everyone AckFailure, // acknowledgement of failure to leader Notification, // just a notification (so purpose of message is to send the SyncHeader) Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round) } /// A sync message is a message that is intended only for the consensus /// algorithm. The message goes directly to a component. #[derive(Debug)] pub(crate) struct SyncCompMessage { pub sync_header: SyncHeader, pub target_component_id: ConnectorId, pub content: SyncCompContent, } #[derive(Debug)] pub(crate) enum SyncPortContent { SilentPortNotification, NotificationWave, } /// A sync message intended for the consensus algorithm. This message does not /// go to a component, but through a channel (and results in potential /// rerouting) because we're not sure about the ID of the component that holds /// the other end of the channel. #[derive(Debug)] pub(crate) struct SyncPortMessage { pub sync_header: SyncHeader, pub source_port: PortIdLocal, pub target_port: PortIdLocal, pub content: SyncPortContent, } #[derive(Debug)] pub(crate) enum SyncControlContent { ChannelIsClosed(PortIdLocal), // contains port that is owned by the recipient of the message } /// A sync control message: originating from the scheduler, but intended for the /// current sync round of the recipient. Every kind of consensus algorithm must /// be able to handle such a message. #[derive(Debug)] pub(crate) struct SyncControlMessage { // For now these control messages are only aimed at components. Might change // in the future. But for now we respond to messages from components that // have, because of that message, published their ID. pub in_response_to_sync_round: u32, pub target_component_id: ConnectorId, pub content: SyncControlContent, } /// A control message is a message intended for the scheduler that is executing /// a component. #[derive(Debug)] pub(crate) struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub sending_component_id: ConnectorId, pub content: ControlContent, } #[derive(Debug)] pub(crate) enum ControlContent { PortPeerChanged(PortIdLocal, ConnectorId), CloseChannel(PortIdLocal), Ack, Ping, } /// Combination of data message and control messages. #[derive(Debug)] pub(crate) enum Message { Data(DataMessage), SyncComp(SyncCompMessage), SyncPort(SyncPortMessage), SyncControl(SyncControlMessage), Control(ControlMessage), } impl Message { /// If the message is sent through a particular channel, then this function /// returns the port through which the message was sent. pub(crate) fn source_port(&self) -> Option { // Currently only data messages have a source port match self { Message::Data(message) => return Some(message.data_header.sending_port), Message::SyncPort(message) => return Some(message.source_port), Message::SyncComp(_) => return None, Message::SyncControl(_) => return None, Message::Control(_) => return None, } } /// If the message is sent through a particular channel, then this function /// returns the target port through which the message was sent. pub(crate) fn target_port(&self) -> Option { match self { Message::Data(message) => return Some(message.data_header.target_port), Message::SyncPort(message) => return Some(message.target_port), Message::SyncComp(_) => return None, Message::SyncControl(_) => return None, Message::Control(message) => { match &message.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), ControlContent::CloseChannel(port_id) => return Some(*port_id), ControlContent::Ping => return None, ControlContent::Ack => return None, } } } } pub(crate) fn source_component(&self) -> Option { match self { Message::Data(message) => Some(message.sync_header.sending_component_id), Message::SyncPort(message) => Some(message.sync_header.sending_component_id), Message::SyncComp(message) => Some(message.sync_header.sending_component_id), Message::SyncControl(_) => None, Message::Control(message) => Some(message.sending_component_id) } } pub(crate) fn as_data(&self) -> &DataMessage { match self { Message::Data(v) => v, _ => unreachable!(), } } } /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { messages: Mutex>, } impl PublicInbox { pub fn new() -> Self { Self{ messages: Mutex::new(VecDeque::new()), } } pub(crate) fn insert_message(&self, message: Message) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } pub(crate) fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } pub fn is_empty(&self) -> bool { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } pub fn clear(&self) { let mut lock = self.messages.lock().unwrap(); lock.clear(); } }