Files @ 4da5e57a9834
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs - annotation

4da5e57a9834 8.0 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Add error checking to sending component messages
a43d61913724
68411f4b8014
cf26538b25dc
8c5d438b0fa3
dd4e6a5314f7
1677e0c9568d
68411f4b8014
68411f4b8014
68411f4b8014
a43d61913724
8c5d438b0fa3
68411f4b8014
cf26538b25dc
68411f4b8014
1677e0c9568d
1677e0c9568d
088be7630245
68411f4b8014
58dfabd1be9f
58dfabd1be9f
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
088be7630245
68411f4b8014
18c70be7107f
68411f4b8014
68411f4b8014
68411f4b8014
7662b8fb871d
58dfabd1be9f
58dfabd1be9f
68411f4b8014
1755ca411ca7
68411f4b8014
1677e0c9568d
68411f4b8014
68411f4b8014
088be7630245
58dfabd1be9f
58dfabd1be9f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
b4a8c628800d
68411f4b8014
c97c5d60bc61
68411f4b8014
1677e0c9568d
1677e0c9568d
68411f4b8014
1677e0c9568d
68411f4b8014
1677e0c9568d
1677e0c9568d
68411f4b8014
dd4e6a5314f7
58dfabd1be9f
58dfabd1be9f
68411f4b8014
1677e0c9568d
68411f4b8014
1677e0c9568d
68411f4b8014
68411f4b8014
1677e0c9568d
1677e0c9568d
1677e0c9568d
1677e0c9568d
1677e0c9568d
b4a8c628800d
1677e0c9568d
1677e0c9568d
1677e0c9568d
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
1677e0c9568d
1677e0c9568d
1677e0c9568d
1677e0c9568d
1677e0c9568d
1677e0c9568d
8a530d2dc72f
8a530d2dc72f
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
03f278e76a41
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
daf15df0f8ca
68411f4b8014
68411f4b8014
daf15df0f8ca
daf15df0f8ca
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
daf15df0f8ca
daf15df0f8ca
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
1677e0c9568d
1677e0c9568d
03f278e76a41
68411f4b8014
b4ac681e0e7f
b4ac681e0e7f
a99ae23c30ec
a99ae23c30ec
a99ae23c30ec
a99ae23c30ec
a99ae23c30ec
b4a8c628800d
b4a8c628800d
b4a8c628800d
b4a8c628800d
03f278e76a41
b4a8c628800d
a99ae23c30ec
a99ae23c30ec
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
0161dd921e3a
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
18c70be7107f
a99ae23c30ec
a99ae23c30ec
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
4da5e57a9834
4da5e57a9834
4da5e57a9834
4da5e57a9834
4da5e57a9834
cf26538b25dc
use std::sync::Mutex;
use std::collections::VecDeque;

use crate::protocol::eval::ValueGroup;
use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner};
use crate::runtime2::port::ChannelId;

use super::ConnectorId;
use super::consensus::{GlobalSolution, LocalSolution};
use super::port::PortIdLocal;

// TODO: Remove Debug derive from all types

#[derive(Debug, Copy, Clone)]
pub(crate) struct ChannelAnnotation {
    pub channel_id: ChannelId,
    pub registered_id: Option<BranchMarker>,
    pub expected_firing: Option<bool>,
}

/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
/// unique identifier for a branch, but differs in that a branch only has one
/// branch ID, but might have multiple associated markers (i.e. one branch
/// performing a `put` three times will generate three markers.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BranchMarker{
    marker: u32,
}

impl BranchMarker {
    #[inline]
    pub(crate) fn new(marker: u32) -> Self {
        debug_assert!(marker != 0);
        return Self{ marker };
    }

    #[inline]
    pub(crate) fn new_invalid() -> Self {
        return Self{ marker: 0 }
    }
}

/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone, Copy)]
pub(crate) struct SyncHeader {
    pub sending_component_id: ConnectorId,
    pub highest_component_id: ConnectorId,
    pub sync_round: u32,
}

/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
    pub expected_mapping: Vec<ChannelAnnotation>,
    pub sending_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub new_mapping: BranchMarker,
}

/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessage {
    pub sync_header: SyncHeader,
    pub data_header: DataHeader,
    pub content: ValueGroup,
}

#[derive(Debug)]
pub(crate) enum SyncCompContent {
    LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever)
    LocalSolution(LocalSolution), // sending a local solution to the leader
    PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results
    GlobalSolution(GlobalSolution), // broadcasting to everyone
    GlobalFailure, // broadcasting to everyone
    AckFailure, // acknowledgement of failure to leader
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
    Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
}

/// A sync message is a message that is intended only for the consensus
/// algorithm. The message goes directly to a component.
#[derive(Debug)]
pub(crate) struct SyncCompMessage {
    pub sync_header: SyncHeader,
    pub target_component_id: ConnectorId,
    pub content: SyncCompContent,
}

#[derive(Debug)]
pub(crate) enum SyncPortContent {
    SilentPortNotification,
    NotificationWave,
}

/// A sync message intended for the consensus algorithm. This message does not
/// go to a component, but through a channel (and results in potential
/// rerouting) because we're not sure about the ID of the component that holds
/// the other end of the channel.
#[derive(Debug)]
pub(crate) struct SyncPortMessage {
    pub sync_header: SyncHeader,
    pub source_port: PortIdLocal,
    pub target_port: PortIdLocal,
    pub content: SyncPortContent,
}

#[derive(Debug)]
pub(crate) enum SyncControlContent {
    ChannelIsClosed(PortIdLocal), // contains port that is owned by the recipient of the message
}

/// A sync control message: originating from the scheduler, but intended for the
/// current sync round of the recipient. Every kind of consensus algorithm must
/// be able to handle such a message.
#[derive(Debug)]
pub(crate) struct SyncControlMessage {
    // For now these control messages are only aimed at components. Might change
    // in the future. But for now we respond to messages from components that
    // have, because of that message, published their ID.
    pub in_response_to_sync_round: u32,
    pub target_component_id: ConnectorId,
    pub content: SyncControlContent,
}

/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sending_component_id: ConnectorId,
    pub content: ControlContent,
}

#[derive(Debug)]
pub(crate) enum ControlContent {
    PortPeerChanged(PortIdLocal, ConnectorId),
    CloseChannel(PortIdLocal),
    Ack,
    Ping,
}

/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum Message {
    Data(DataMessage),
    SyncComp(SyncCompMessage),
    SyncPort(SyncPortMessage),
    SyncControl(SyncControlMessage),
    Control(ControlMessage),
}

impl Message {
    /// If the message is sent through a particular channel, then this function
    /// returns the port through which the message was sent.
    pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
        // Currently only data messages have a source port
        match self {
            Message::Data(message) => return Some(message.data_header.sending_port),
            Message::SyncPort(message) => return Some(message.source_port),
            Message::SyncComp(_) => return None,
            Message::SyncControl(_) => return None,
            Message::Control(_) => return None,
        }
    }

    /// If the message is sent through a particular channel, then this function
    /// returns the target port through which the message was sent.
    pub(crate) fn target_port(&self) -> Option<PortIdLocal> {
        match self {
            Message::Data(message) => return Some(message.data_header.target_port),
            Message::SyncPort(message) => return Some(message.target_port),
            Message::SyncComp(_) => return None,
            Message::SyncControl(_) => return None,
            Message::Control(message) => {
                match &message.content {
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
                    ControlContent::Ping => return None,
                    ControlContent::Ack => return None,
                }
            }
        }
    }

    pub(crate) fn source_component(&self) -> Option<ConnectorId> {
        match self {
            Message::Data(message) => Some(message.sync_header.sending_component_id),
            Message::SyncPort(message) => Some(message.sync_header.sending_component_id),
            Message::SyncComp(message) => Some(message.sync_header.sending_component_id),
            Message::SyncControl(_) => None,
            Message::Control(message) => Some(message.sending_component_id)
        }
    }

    pub(crate) fn as_data(&self) -> &DataMessage {
        match self {
            Message::Data(v) => v,
            _ => unreachable!(),
        }
    }
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub(crate) fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub(crate) fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }

    pub fn clear(&self) {
        let mut lock = self.messages.lock().unwrap();
        lock.clear();
    }
}