Files @ 8c5d438b0fa3
Branch filter:

Location: CSY/reowolf/src/runtime2/messages.rs

8c5d438b0fa3 4.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
rewriting inbox to behave mpsc-like
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;

use crate::common::Id;
use crate::PortId;
use crate::protocol::*;
use crate::protocol::eval::*;

use super::connector::{BranchId, PortIdLocal};

/// A message residing in a connector's inbox (waiting to be put into some kind
/// of speculative branch), or a message waiting to be sent.
#[derive(Clone)]
pub struct BufferedMessage {
    pub(crate) sending_port: PortId,
    pub(crate) receiving_port: PortId,
    pub(crate) peer_prev_branch_id: Option<u32>,
    pub(crate) peer_cur_branch_id: u32,
    pub(crate) message: ValueGroup,
}

/// A connector's global inbox. Any received message ends up here. This is
/// because a message might be received before a branch arrives at the
/// corresponding `get()` that is supposed to receive that message. Hence we
/// need to store it for all future branches that might be able to receive it.
pub struct ConnectorInbox {
    // TODO: @optimize, HashMap + Vec is a bit stupid.
    messages: HashMap<PortAction, Vec<BufferedMessage>>
}


/// An action performed on a port. Unsure about this
#[derive(PartialEq, Eq, Hash)]
struct PortAction {
    port_id: u32,
    prev_branch_id: Option<u32>,
}

// TODO: @remove
impl ConnectorInbox {
    pub fn new() -> Self {
        Self {
            messages: HashMap::new(),
        }
    }

    /// Inserts a new message into the inbox.
    pub fn insert_message(&mut self, message: BufferedMessage) {
        // TODO: @error - Messages are received from actors we generally cannot
        //  trust, and may be unreliable, so messages may be received multiple
        //  times or have spoofed branch IDs. Debug asserts are present for the
        //  initial implementation.

        // If it is the first message on the port, then we cannot possible have
        // a previous port mapping on that port.
        let port_action = PortAction{
            port_id: message.receiving_port.0.u32_suffix,
            prev_branch_id: message.peer_prev_branch_id,
        };

        match self.messages.entry(port_action) {
            Entry::Occupied(mut entry) => {
                let entry = entry.get_mut();
                debug_assert!(
                    entry.iter()
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
                        .is_none(),
                    "inbox already contains sent message (same new branch ID)"
                );

                entry.push(message);
            },
            Entry::Vacant(entry) => {
                entry.insert(vec![message]);
            }
        }
    }

    /// Checks if the provided port (and the branch id mapped to that port)
    /// correspond to any messages in the inbox.
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
        let port_action = PortAction{
            port_id,
            prev_branch_id: prev_branch_id_at_port,
        };

        match self.messages.get(&port_action) {
            Some(messages) => return Some(messages.as_slice()),
            None => return None,
        }
    }

    pub fn clear(&mut self) {
        self.messages.clear();
    }
}

/// A connector's outbox. A temporary storage for messages that are sent by
/// branches performing `put`s until we're done running all branches and can
/// actually transmit the messages.
pub struct ConnectorOutbox {
    messages: Vec<BufferedMessage>,
}

impl ConnectorOutbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
        }
    }

    pub fn insert_message(&mut self, message: BufferedMessage) {
        // TODO: @error - Depending on the way we implement the runtime in the
        //  future we might end up not trusting "our own code" (i.e. in case
        //  the connectors we are running are described by foreign code)
        debug_assert!(
            self.messages.iter()
                .find(|v|
                    v.sending_port == message.sending_port &&
                    v.peer_prev_branch_id == message.peer_prev_branch_id
                )
                .is_none(),
            "messages was already registered for sending"
        );

        self.messages.push(message);
    }

    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
        self.messages.pop()
    }

    pub fn clear(&mut self) {
        self.messages.clear();
    }
}