diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs new file mode 100644 index 0000000000000000000000000000000000000000..1d29934465b34add25097b85e6a760806cbf87da --- /dev/null +++ b/src/runtime2/messages.rs @@ -0,0 +1,137 @@ +use std::collections::HashMap; +use std::collections::hash_map::Entry; + +use crate::PortId; +use crate::common::Id; +use crate::protocol::*; +use crate::protocol::eval::*; + +/// A message residing in a connector's inbox (waiting to be put into some kind +/// of speculative branch), or a message waiting to be sent. +#[derive(Clone)] +pub struct BufferedMessage { + pub(crate) sending_port: PortId, + pub(crate) receiving_port: PortId, + pub(crate) peer_prev_branch_id: Option, + pub(crate) peer_cur_branch_id: u32, + pub(crate) message: ValueGroup, +} + +/// An action performed on a port. Unsure about this +#[derive(PartialEq, Eq, Hash)] +struct PortAction { + port_id: u32, + prev_branch_id: Option, +} + +/// A connector's global inbox. Any received message ends up here. This is +/// because a message might be received before a branch arrives at the +/// corresponding `get()` that is supposed to receive that message. Hence we +/// need to store it for all future branches that might be able to receive it. +pub struct ConnectorInbox { + // TODO: @optimize, HashMap + Vec is a bit stupid. + messages: HashMap> +} + +impl ConnectorInbox { + pub fn new() -> Self { + Self { + messages: HashMap::new(), + } + } + + /// Inserts a new message into the inbox. + pub fn insert_message(&mut self, message: BufferedMessage) { + // TODO: @error - Messages are received from actors we generally cannot + // trust, and may be unreliable, so messages may be received multiple + // times or have spoofed branch IDs. Debug asserts are present for the + // initial implementation. + + // If it is the first message on the port, then we cannot possible have + // a previous port mapping on that port. + let port_action = PortAction{ + port_id: message.sending_port.0.u32_suffix, + prev_branch_id: message.peer_prev_branch_id, + }; + + match self.messages.entry(port_action) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + debug_assert!( + entry.iter() + .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id) + .is_none(), + "inbox already contains sent message (same new branch ID)" + ); + + entry.push(message); + }, + Entry::Vacant(entry) => { + entry.insert(vec![message]); + } + } + } + + /// Checks if the provided port (and the branch id mapped to that port) + /// correspond to any messages in the inbox. + pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option) -> Option<&[BufferedMessage]> { + let port_action = PortAction{ + port_id, + prev_branch_id: prev_branch_id_at_port, + }; + + match self.messages.get(&port_action) { + Some(messages) => return Some(messages.as_slice()), + None => return None, + } + } +} + +/// A connector's outbox. A temporary storage for messages that are sent by +/// branches performing `put`s until we're done running all branches and can +/// actually transmit the messages. +pub struct ConnectorOutbox { + messages: Vec, + sent_counter: usize, +} + +impl ConnectorOutbox { + pub fn new() -> Self { + Self{ + messages: Vec::new(), + sent_counter: 0, + } + } + + pub fn insert_message(&mut self, message: BufferedMessage) { + // TODO: @error - Depending on the way we implement the runtime in the + // future we might end up not trusting "our own code" (i.e. in case + // the connectors we are running are described by foreign code) + debug_assert!( + self.messages.iter() + .find(|v| + v.sending_port == message.sending_port && + v.peer_prev_branch_id == message.peer_prev_branch_id + ) + .is_none(), + "messages was already registered for sending" + ); + + self.messages.push(message); + } + + pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> { + if self.sent_counter == self.messages.len() { + return None; + } + + let cur_index = self.sent_counter; + self.sent_counter += 1; + return Some(&self.messages[cur_index]); + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.sent_counter = 0; + } +} \ No newline at end of file