Files @ 68411f4b8014
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

68411f4b8014 4.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Round of cleanup on temporary type names and old code
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;

use super::ConnectorId;
use super::branch::BranchId;
use super::consensus::{GlobalSolution, LocalSolution};
use super::port::PortIdLocal;

// TODO: Remove Debug derive from all types

#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
    pub port_id: PortIdLocal,
    pub registered_id: Option<BranchId>,
    pub expected_firing: Option<bool>,
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<PortAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchId,
}

// TODO: Very much on the fence about this. On one hand I thought making it a
//  data message was neat because "silent port notification" should be rerouted
//  like any other data message to determine the component ID of the receiver
//  and to make it part of the leader election algorithm for the sync leader.
//  However: it complicates logic quite a bit. Really it might be easier to
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
#[derive(Debug, Clone)]
pub(crate) enum DataContent {
    SilentPortNotification,
    Message(ValueGroup),
}

impl DataContent {
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
        match self {
            DataContent::SilentPortNotification => None,
            DataContent::Message(message) => Some(message),
        }
    }
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessage {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: DataContent,
}

#[derive(Debug)]
pub(crate) enum SyncContent {
    LocalSolution(LocalSolution), // sending a local solution to the leader
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessage {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum Message {
    Data(DataMessage),
    Sync(SyncMessage),
    Control(ControlMessage),
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}