Files @ 5e53e3e9d68e
Branch filter:

Location: CSY/reowolf/src/runtime2/communication.rs

5e53e3e9d68e 5.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Max Henger
Merge branch 'feat-transmitting-ports' into 'master'

feat: transmitting ports

See merge request nl-cwi-csy/reowolf!8
use crate::protocol::eval::*;
use crate::protocol::eval::value::ValueId;
use super::runtime::*;
use super::component::*;

// -----------------------------------------------------------------------------
// Generic types
// -----------------------------------------------------------------------------

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct PortId(pub u32);

impl PortId {
    /// This value is not significant, it is chosen to make debugging easier: a
    /// very large port number is more likely to shine a light on bugs.
    pub fn new_invalid() -> Self {
        return Self(u32::MAX);
    }
}

pub struct Channel {
    pub putter_id: PortId,
    pub getter_id: PortId,
}

// -----------------------------------------------------------------------------
// Data messages
// -----------------------------------------------------------------------------

#[derive(Debug)]
pub struct DataMessage {
    pub data_header: MessageDataHeader,
    pub sync_header: MessageSyncHeader,
    pub content: ValueGroup,
    pub ports: Vec<TransmittedPort>,
}

#[derive(Debug)]
pub enum PortAnnotationKind {
    Getter(PortAnnotationGetter),
    Putter(PortAnnotationPutter),
}

#[derive(Debug)]
pub struct PortAnnotationGetter {
    pub self_comp_id: CompId,
    pub self_port_id: PortId,
    pub peer_comp_id: CompId,
    pub peer_port_id: PortId,
}

#[derive(Debug)]
pub struct PortAnnotationPutter {
    pub self_comp_id: CompId,
    pub self_port_id: PortId,
}

#[derive(Debug)]
pub struct MessageDataHeader {
    pub expected_mapping: Vec<(PortAnnotationKind, Option<u32>)>,
    pub new_mapping: u32,
    pub source_port: PortId,
    pub target_port: PortId,
}

#[derive(Debug)]
pub struct TransmittedPort {
    pub locations: Vec<ValueId>, // within `content`
    pub messages: Vec<DataMessage>, // owned by previous component
    pub peer_comp: CompId,
    pub peer_port: PortId,
    pub kind: PortKind,
    pub state: PortState,
}

// -----------------------------------------------------------------------------
// Sync messages
// -----------------------------------------------------------------------------

#[derive(Debug)]
pub struct SyncMessage {
    pub sync_header: MessageSyncHeader,
    pub content: SyncMessageContent,
}

#[derive(Debug)]
pub enum SyncLocalSolutionEntry {
    Putter(SyncSolutionPutterPort),
    Getter(SyncSolutionGetterPort),
}

pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;

/// Getter port in a solution. Upon receiving a message it is certain about who
/// its peer is.
#[derive(Debug)]
pub struct SyncSolutionGetterPort {
    pub self_comp_id: CompId,
    pub self_port_id: PortId,
    pub peer_comp_id: CompId,
    pub peer_port_id: PortId,
    pub mapping: u32,
    pub failed: bool,
}

/// Putter port in a solution. A putter may not be certain about who its peer
/// component/port is.
#[derive(Debug)]
pub struct SyncSolutionPutterPort {
    pub self_comp_id: CompId,
    pub self_port_id: PortId,
    pub mapping: u32,
    pub failed: bool,
}

#[derive(Debug)]
pub struct SyncSolutionChannel {
    pub putter: Option<SyncSolutionPutterPort>,
    pub getter: Option<SyncSolutionGetterPort>,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SyncRoundDecision {
    None,
    Solution,
    Failure,
}

#[derive(Debug)]
pub struct SyncPartialSolution {
    pub channel_mapping: Vec<SyncSolutionChannel>,
    pub decision: SyncRoundDecision,
}

impl Default for SyncPartialSolution {
    fn default() -> Self {
        return Self{
            channel_mapping: Vec::new(),
            decision: SyncRoundDecision::None,
        }
    }
}

#[derive(Debug)]
pub enum SyncMessageContent {
    NotificationOfLeader,
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
    PartialSolution(SyncPartialSolution), // partial solution of multiple components
    GlobalSolution,
    GlobalFailure,
}

// -----------------------------------------------------------------------------
// Control messages
// -----------------------------------------------------------------------------

#[derive(Debug)]
pub struct ControlMessage {
    pub(crate) id: ControlId,
    pub sender_comp_id: CompId,
    pub target_port_id: Option<PortId>,
    pub content: ControlMessageContent,
}

/// Content of a control message. If the content refers to a port then the
/// `target_port_id` field is the one that it refers to.
#[derive(Copy, Clone, Debug)]
pub enum ControlMessageContent {
    Ack,
    BlockPort,
    UnblockPort,
    ClosePort(ControlMessageClosePort),
    PortPeerChangedBlock,
    PortPeerChangedUnblock(PortId, CompId), // contains (new_port_id, new_component_id)
}

#[derive(Copy, Clone, Debug)]
pub struct ControlMessageClosePort {
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
}

// -----------------------------------------------------------------------------
// Messages (generic)
// -----------------------------------------------------------------------------

#[derive(Debug)]
pub struct MessageSyncHeader {
    pub sync_round: u32,
    pub sending_id: CompId,
    pub highest_id: CompId,
}

#[derive(Debug)]
pub enum Message {
    Data(DataMessage),
    Sync(SyncMessage),
    Control(ControlMessage),
    Poll,
}

impl Message {
    pub(crate) fn target_port(&self) -> Option<PortId> {
        match self {
            Message::Data(v) =>
                return Some(v.data_header.target_port),
            Message::Control(v) =>
                return v.target_port_id,
            Message::Sync(_) =>
                return None,
            Message::Poll =>
                return None,
        }
    }

    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
        match self {
            Message::Data(v) =>
                v.data_header.target_port = port_id,
            Message::Control(v) =>
                v.target_port_id = Some(port_id),
            Message::Sync(_) => unreachable!(), // should never be called for this message type
            Message::Poll => unreachable!(),
        }
    }
}