diff --git a/src/runtime/inbox.rs b/src/runtime/inbox.rs new file mode 100644 index 0000000000000000000000000000000000000000..154bb40d1bce3f6d734dd3f22a3cc1396e8d9a4c --- /dev/null +++ b/src/runtime/inbox.rs @@ -0,0 +1,237 @@ +use std::sync::Mutex; +use std::collections::VecDeque; + +use crate::protocol::eval::ValueGroup; +use crate::runtime::consensus::{ComponentPresence, SolutionCombiner}; +use crate::runtime::port::ChannelId; + +use super::ConnectorId; +use super::consensus::{GlobalSolution, LocalSolution}; +use super::port::PortIdLocal; + +#[derive(Debug, Copy, Clone)] +pub(crate) struct ChannelAnnotation { + pub channel_id: ChannelId, + pub registered_id: Option, + pub expected_firing: Option, +} + +/// Marker for a branch in a port mapping. A marker is, like a branch ID, a +/// unique identifier for a branch, but differs in that a branch only has one +/// branch ID, but might have multiple associated markers (i.e. one branch +/// performing a `put` three times will generate three markers. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct BranchMarker{ + marker: u32, +} + +impl BranchMarker { + #[inline] + pub(crate) fn new(marker: u32) -> Self { + debug_assert!(marker != 0); + return Self{ marker }; + } + + #[inline] + pub(crate) fn new_invalid() -> Self { + return Self{ marker: 0 } + } +} + +/// The header added by the synchronization algorithm to all. +#[derive(Debug, Clone, Copy)] +pub(crate) struct SyncHeader { + pub sending_component_id: ConnectorId, + pub highest_component_id: ConnectorId, + pub sync_round: u32, +} + +/// The header added to data messages +#[derive(Debug, Clone)] +pub(crate) struct DataHeader { + pub expected_mapping: Vec, + pub sending_port: PortIdLocal, + pub target_port: PortIdLocal, + pub new_mapping: BranchMarker, +} + +/// A data message is a message that is intended for the receiver's PDL code, +/// but will also be handled by the consensus algorithm +#[derive(Debug, Clone)] +pub(crate) struct DataMessage { + pub sync_header: SyncHeader, + pub data_header: DataHeader, + pub content: ValueGroup, +} + +#[derive(Debug)] +pub(crate) enum SyncCompContent { + LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever) + LocalSolution(LocalSolution), // sending a local solution to the leader + PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results + GlobalSolution(GlobalSolution), // broadcasting to everyone + GlobalFailure, // broadcasting to everyone + AckFailure, // acknowledgement of failure to leader + Notification, // just a notification (so purpose of message is to send the SyncHeader) + Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round) +} + +/// A sync message is a message that is intended only for the consensus +/// algorithm. The message goes directly to a component. +#[derive(Debug)] +pub(crate) struct SyncCompMessage { + pub sync_header: SyncHeader, + pub target_component_id: ConnectorId, + pub content: SyncCompContent, +} + +#[derive(Debug)] +pub(crate) enum SyncPortContent { + SilentPortNotification, + NotificationWave, +} + +/// A sync message intended for the consensus algorithm. This message does not +/// go to a component, but through a channel (and results in potential +/// rerouting) because we're not sure about the ID of the component that holds +/// the other end of the channel. +#[derive(Debug)] +pub(crate) struct SyncPortMessage { + pub sync_header: SyncHeader, + pub source_port: PortIdLocal, + pub target_port: PortIdLocal, + pub content: SyncPortContent, +} + +#[derive(Debug)] +pub(crate) enum SyncControlContent { + ChannelIsClosed(PortIdLocal), // contains port that is owned by the recipient of the message +} + +/// A sync control message: originating from the scheduler, but intended for the +/// current sync round of the recipient. Every kind of consensus algorithm must +/// be able to handle such a message. +#[derive(Debug)] +pub(crate) struct SyncControlMessage { + // For now these control messages are only aimed at components. Might change + // in the future. But for now we respond to messages from components that + // have, because of that message, published their ID. + pub in_response_to_sync_round: u32, + pub target_component_id: ConnectorId, + pub content: SyncControlContent, +} + +/// A control message is a message intended for the scheduler that is executing +/// a component. +#[derive(Debug)] +pub(crate) struct ControlMessage { + pub id: u32, // generic identifier, used to match request to response + pub sending_component_id: ConnectorId, + pub content: ControlContent, +} + +#[derive(Debug)] +pub(crate) enum ControlContent { + PortPeerChanged(PortIdLocal, ConnectorId), + CloseChannel(PortIdLocal), + Ack, + Ping, +} + +/// Combination of data message and control messages. +#[derive(Debug)] +pub(crate) enum Message { + Data(DataMessage), + SyncComp(SyncCompMessage), + SyncPort(SyncPortMessage), + SyncControl(SyncControlMessage), + Control(ControlMessage), +} + +impl Message { + /// If the message is sent through a particular channel, then this function + /// returns the port through which the message was sent. + pub(crate) fn source_port(&self) -> Option { + // Currently only data messages have a source port + match self { + Message::Data(message) => return Some(message.data_header.sending_port), + Message::SyncPort(message) => return Some(message.source_port), + Message::SyncComp(_) => return None, + Message::SyncControl(_) => return None, + Message::Control(_) => return None, + } + } + + /// If the message is sent through a particular channel, then this function + /// returns the target port through which the message was sent. + pub(crate) fn target_port(&self) -> Option { + match self { + Message::Data(message) => return Some(message.data_header.target_port), + Message::SyncPort(message) => return Some(message.target_port), + Message::SyncComp(_) => return None, + Message::SyncControl(_) => return None, + Message::Control(message) => { + match &message.content { + ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), + ControlContent::CloseChannel(port_id) => return Some(*port_id), + ControlContent::Ping => return None, + ControlContent::Ack => return None, + } + } + } + } + + pub(crate) fn source_component(&self) -> Option { + match self { + Message::Data(message) => Some(message.sync_header.sending_component_id), + Message::SyncPort(message) => Some(message.sync_header.sending_component_id), + Message::SyncComp(message) => Some(message.sync_header.sending_component_id), + Message::SyncControl(_) => None, + Message::Control(message) => Some(message.sending_component_id) + } + } + + pub(crate) fn as_data(&self) -> &DataMessage { + match self { + Message::Data(v) => v, + _ => unreachable!(), + } + } +} + +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, +} + +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub(crate) fn insert_message(&self, message: Message) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub(crate) fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } + + pub fn clear(&self) { + let mut lock = self.messages.lock().unwrap(); + lock.clear(); + } +} \ No newline at end of file