Files @ 32d91577e090
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox2.rs - annotation

32d91577e090 4.1 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
initial multithreaded runtime
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
ce98be9707a6
ce98be9707a6
ce98be9707a6
ecc47971d535
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
f4d1c8c04de6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
32d91577e090
ce98be9707a6
d98661a2215e
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
32d91577e090
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
649f3bb14317
ecc47971d535
ecc47971d535
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
d98661a2215e
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
328f04b6612f
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
328f04b6612f
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
ce98be9707a6
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;
use crate::runtime2::branch::BranchId;
use crate::runtime2::ConnectorId;
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
use crate::runtime2::port::PortIdLocal;

// TODO: Remove Debug derive from all types

#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
    pub port_id: PortIdLocal,
    pub registered_id: Option<BranchId>,
    pub expected_firing: Option<bool>,
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<PortAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchId,
}

// TODO: Very much on the fence about this. On one hand I thought making it a
//  data message was neat because "silent port notification" should be rerouted
//  like any other data message to determine the component ID of the receiver
//  and to make it part of the leader election algorithm for the sync leader.
//  However: it complicates logic quite a bit. Really it might be easier to
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
#[derive(Debug, Clone)]
pub(crate) enum DataContent {
    SilentPortNotification,
    Message(ValueGroup),
}

impl DataContent {
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
        match self {
            DataContent::SilentPortNotification => None,
            DataContent::Message(message) => Some(message),
        }
    }
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessageFancy {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: DataContent,
}

#[derive(Debug)]
pub(crate) enum SyncContent {
    LocalSolution(LocalSolution), // sending a local solution to the leader
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessageFancy {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessageFancy {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum MessageFancy {
    Data(DataMessageFancy),
    Sync(SyncMessageFancy),
    Control(ControlMessageFancy),
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<MessageFancy>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: MessageFancy) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<MessageFancy> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}