Files @ bdf284174817
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

bdf284174817 8.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Update header in readme
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;
use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner};
use crate::runtime2::port::ChannelId;

use super::ConnectorId;
use super::consensus::{GlobalSolution, LocalSolution};
use super::port::PortIdLocal;

#[derive(Debug, Copy, Clone)]
pub(crate) struct ChannelAnnotation {
    pub channel_id: ChannelId,
    pub registered_id: Option<BranchMarker>,
    pub expected_firing: Option<bool>,
}

/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
/// unique identifier for a branch, but differs in that a branch only has one
/// branch ID, but might have multiple associated markers (i.e. one branch
/// performing a `put` three times will generate three markers.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BranchMarker{
    marker: u32,
}

impl BranchMarker {
    #[inline]
    pub(crate) fn new(marker: u32) -> Self {
        debug_assert!(marker != 0);
        return Self{ marker };
    }

    #[inline]
    pub(crate) fn new_invalid() -> Self {
        return Self{ marker: 0 }
    }
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone, Copy)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
    pub sync_round: u32,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<ChannelAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchMarker,
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessage {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: ValueGroup,
}

#[derive(Debug)]
pub(crate) enum SyncCompContent {
    LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever)
    LocalSolution(LocalSolution), // sending a local solution to the leader
    PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    GlobalFailure, // broadcasting to everyone
    AckFailure, // acknowledgement of failure to leader
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
    Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm. The message goes directly to a component.
#[derive(Debug)]
pub(crate) struct SyncCompMessage {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncCompContent,
}

#[derive(Debug)]
pub(crate) enum SyncPortContent {
    SilentPortNotification,
    NotificationWave,
}

/// A sync message intended for the consensus algorithm. This message does not
/// go to a component, but through a channel (and results in potential
/// rerouting) because we're not sure about the ID of the component that holds
/// the other end of the channel.
#[derive(Debug)]
pub(crate) struct SyncPortMessage {
    pub sync_header: SyncHeader,
    pub source_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub content: SyncPortContent,
}

#[derive(Debug)]
pub(crate) enum SyncControlContent {
    ChannelIsClosed(PortIdLocal), // contains port that is owned by the recipient of the message
}

/// A sync control message: originating from the scheduler, but intended for the
/// current sync round of the recipient. Every kind of consensus algorithm must
/// be able to handle such a message.
#[derive(Debug)]
pub(crate) struct SyncControlMessage {
    // For now these control messages are only aimed at components. Might change
    // in the future. But for now we respond to messages from components that
    // have, because of that message, published their ID.
    pub in_response_to_sync_round: u32,
    pub target_component_id: ConnectorId,
    pub content: SyncControlContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum Message {
    Data(DataMessage),
    SyncComp(SyncCompMessage),
    SyncPort(SyncPortMessage),
    SyncControl(SyncControlMessage),
    Control(ControlMessage),
}

impl Message {
    /// If the message is sent through a particular channel, then this function
    /// returns the port through which the message was sent.
    pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
        // Currently only data messages have a source port
        match self {
            Message::Data(message) => return Some(message.data_header.sending_port),
            Message::SyncPort(message) => return Some(message.source_port),
            Message::SyncComp(_) => return None,
            Message::SyncControl(_) => return None,
            Message::Control(_) => return None,
        }
    }

    /// If the message is sent through a particular channel, then this function
    /// returns the target port through which the message was sent.
    pub(crate) fn target_port(&self) -> Option<PortIdLocal> {
        match self {
            Message::Data(message) => return Some(message.data_header.target_port),
            Message::SyncPort(message) => return Some(message.target_port),
            Message::SyncComp(_) => return None,
            Message::SyncControl(_) => return None,
            Message::Control(message) => {
                match &message.content {
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
                    ControlContent::Ping => return None,
                    ControlContent::Ack => return None,
                }
            }
        }
    }

    pub(crate) fn source_component(&self) -> Option<ConnectorId> {
        match self {
            Message::Data(message) => Some(message.sync_header.sending_component_id),
            Message::SyncPort(message) => Some(message.sync_header.sending_component_id),
            Message::SyncComp(message) => Some(message.sync_header.sending_component_id),
            Message::SyncControl(_) => None,
            Message::Control(message) => Some(message.sending_component_id)
        }
    }

    pub(crate) fn as_data(&self) -> &DataMessage {
        match self {
            Message::Data(v) => v,
            _ => unreachable!(),
        }
    }
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }

    pub fn clear(&self) {
        let mut lock = self.messages.lock().unwrap();
        lock.clear();
    }
}