Files @ f4f12a71e2e2
Branch filter:

Location: CSY/reowolf/src/runtime2/messages.rs

f4f12a71e2e2 7.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP on runtime with error handling and multithreading
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::cmp::Ordering;

use super::connector::{PortIdLocal, BranchId};
use crate::PortId;
use crate::common::Id;
use crate::protocol::*;
use crate::protocol::eval::*;

/// A message residing in a connector's inbox (waiting to be put into some kind
/// of speculative branch), or a message waiting to be sent.
#[derive(Clone)]
pub struct BufferedMessage {
    pub(crate) sending_port: PortId,
    pub(crate) receiving_port: PortId,
    pub(crate) peer_prev_branch_id: Option<u32>,
    pub(crate) peer_cur_branch_id: u32,
    pub(crate) message: ValueGroup,
}

#[derive(Clone)]
pub struct Message {
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

pub struct Inbox {
    messages: Vec<Message>
}

impl Inbox {
    pub fn new() -> Self {
        Self{ messages: Vec::new() }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&mut self, message: Message) {
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
            Ok(_) => {} // message already exists
            Err(idx) => self.messages.insert(idx, message)
        }
    }

    /// Retrieves all messages for the provided conditions
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
        // Seek the first message with the appropriate port ID and branch ID
        let num_messages = self.messages.len();

        for first_idx in 0..num_messages {
            let msg = &self.messages[first_idx];
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
                // Found a match, seek ahead until the condition is no longer true
                let mut last_idx = first_idx + 1;
                while last_idx < num_messages {
                    let msg = &self.messages[last_idx];
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
                        // No longer matching
                        break;
                    }
                    last_idx += 1;
                }

                // Return all the matching messages
                return &self.messages[first_idx..last_idx];
            } else if msg.receiving_port.id > port_id.id {
                // Because messages are ordered, this implies we couldn't find
                // any message
                break;
            }
        }

        return &self.messages[0..0];
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
    }

    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
    // branch id.
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
        if ord != Ordering::Equal { return ord; }

        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
        if ord != Ordering::Equal { return ord; }

        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
    }
}

/// A connector's global inbox. Any received message ends up here. This is
/// because a message might be received before a branch arrives at the
/// corresponding `get()` that is supposed to receive that message. Hence we
/// need to store it for all future branches that might be able to receive it.
pub struct ConnectorInbox {
    // TODO: @optimize, HashMap + Vec is a bit stupid.
    messages: HashMap<PortAction, Vec<BufferedMessage>>
}


/// An action performed on a port. Unsure about this
#[derive(PartialEq, Eq, Hash)]
struct PortAction {
    port_id: u32,
    prev_branch_id: Option<u32>,
}

// TODO: @remove
impl ConnectorInbox {
    pub fn new() -> Self {
        Self {
            messages: HashMap::new(),
        }
    }

    /// Inserts a new message into the inbox.
    pub fn insert_message(&mut self, message: BufferedMessage) {
        // TODO: @error - Messages are received from actors we generally cannot
        //  trust, and may be unreliable, so messages may be received multiple
        //  times or have spoofed branch IDs. Debug asserts are present for the
        //  initial implementation.

        // If it is the first message on the port, then we cannot possible have
        // a previous port mapping on that port.
        let port_action = PortAction{
            port_id: message.receiving_port.0.u32_suffix,
            prev_branch_id: message.peer_prev_branch_id,
        };

        match self.messages.entry(port_action) {
            Entry::Occupied(mut entry) => {
                let entry = entry.get_mut();
                debug_assert!(
                    entry.iter()
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
                        .is_none(),
                    "inbox already contains sent message (same new branch ID)"
                );

                entry.push(message);
            },
            Entry::Vacant(entry) => {
                entry.insert(vec![message]);
            }
        }
    }

    /// Checks if the provided port (and the branch id mapped to that port)
    /// correspond to any messages in the inbox.
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
        let port_action = PortAction{
            port_id,
            prev_branch_id: prev_branch_id_at_port,
        };

        match self.messages.get(&port_action) {
            Some(messages) => return Some(messages.as_slice()),
            None => return None,
        }
    }

    pub fn clear(&mut self) {
        self.messages.clear();
    }
}

/// A connector's outbox. A temporary storage for messages that are sent by
/// branches performing `put`s until we're done running all branches and can
/// actually transmit the messages.
pub struct ConnectorOutbox {
    messages: Vec<BufferedMessage>,
}

impl ConnectorOutbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
        }
    }

    pub fn insert_message(&mut self, message: BufferedMessage) {
        // TODO: @error - Depending on the way we implement the runtime in the
        //  future we might end up not trusting "our own code" (i.e. in case
        //  the connectors we are running are described by foreign code)
        debug_assert!(
            self.messages.iter()
                .find(|v|
                    v.sending_port == message.sending_port &&
                    v.peer_prev_branch_id == message.peer_prev_branch_id
                )
                .is_none(),
            "messages was already registered for sending"
        );

        self.messages.push(message);
    }

    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
        self.messages.pop()
    }

    pub fn clear(&mut self) {
        self.messages.clear();
    }
}