Files @ 2982ea49738a
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs - annotation

2982ea49738a 4.0 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
rename 'synchronous' statement to 'sync'
a43d61913724
68411f4b8014
cf26538b25dc
8c5d438b0fa3
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
a43d61913724
8c5d438b0fa3
68411f4b8014
cf26538b25dc
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
58dfabd1be9f
58dfabd1be9f
68411f4b8014
1755ca411ca7
68411f4b8014
68411f4b8014
68411f4b8014
7662b8fb871d
58dfabd1be9f
58dfabd1be9f
68411f4b8014
1755ca411ca7
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
58dfabd1be9f
58dfabd1be9f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
1755ca411ca7
68411f4b8014
68411f4b8014
68411f4b8014
c97c5d60bc61
c97c5d60bc61
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
c97c5d60bc61
c97c5d60bc61
68411f4b8014
c97c5d60bc61
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
c97c5d60bc61
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
58dfabd1be9f
58dfabd1be9f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
8a530d2dc72f
8a530d2dc72f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
daf15df0f8ca
68411f4b8014
68411f4b8014
daf15df0f8ca
daf15df0f8ca
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
daf15df0f8ca
daf15df0f8ca
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
b4ac681e0e7f
b4ac681e0e7f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
cf26538b25dc
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;

use super::ConnectorId;
use super::branch::BranchId;
use super::consensus::{GlobalSolution, LocalSolution};
use super::port::PortIdLocal;

// TODO: Remove Debug derive from all types

#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
    pub port_id: PortIdLocal,
    pub registered_id: Option<BranchId>,
    pub expected_firing: Option<bool>,
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
    pub sync_round: u32,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<PortAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchId,
}

// TODO: Very much on the fence about this. On one hand I thought making it a
//  data message was neat because "silent port notification" should be rerouted
//  like any other data message to determine the component ID of the receiver
//  and to make it part of the leader election algorithm for the sync leader.
//  However: it complicates logic quite a bit. Really it might be easier to
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
#[derive(Debug, Clone)]
pub(crate) enum DataContent {
    SilentPortNotification,
    Message(ValueGroup),
}

impl DataContent {
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
        match self {
            DataContent::SilentPortNotification => None,
            DataContent::Message(message) => Some(message),
        }
    }
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessage {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: DataContent,
}

#[derive(Debug)]
pub(crate) enum SyncContent {
    LocalSolution(LocalSolution), // sending a local solution to the leader
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessage {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum Message {
    Data(DataMessage),
    Sync(SyncMessage),
    Control(ControlMessage),
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}