Files @ 328f04b6612f
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox2.rs - annotation

328f04b6612f 3.3 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Initial pass of fixing compiler errors
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;
use crate::runtime2::branch::BranchId;
use crate::runtime2::ConnectorId;
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
use crate::runtime2::port::PortIdLocal;

// TODO: Remove Debug derive from all types

#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
    pub port_id: PortIdLocal,
    pub registered_id: Option<BranchId>,
    pub expected_firing: Option<bool>,
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<PortAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchId,
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessageFancy {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: ValueGroup,
}

#[derive(Debug)]
pub(crate) enum SyncContent {
    LocalSolution(LocalSolution), // sending a local solution to the leader
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessageFancy {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessageFancy {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum MessageFancy {
    Data(DataMessageFancy),
    Sync(SyncMessageFancy),
    Control(ControlMessageFancy),
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<MessageFancy>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: MessageFancy) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<MessageFancy> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}