diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 587dffae9711bbdf730e9beed9c7449164a4e940..518df447c90e662382a0aaa57b9afc856d5eada9 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; -use super::messages::{Message, Inbox}; - -use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::{PortId, ProtocolDescription}; -use crate::protocol::eval::{ValueGroup, Value, Prompt}; +use crate::protocol::{ComponentState, RunContext, RunResult}; +use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::runtime2::inbox::{Inbox, Message}; #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) struct PortIdLocal { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs new file mode 100644 index 0000000000000000000000000000000000000000..871ec8720c7256d6479bef3eb8c4bd7eb7838ada --- /dev/null +++ b/src/runtime2/inbox.rs @@ -0,0 +1,89 @@ +use crate::common::Ordering; +use crate::protocol::eval::ValueGroup; +use crate::runtime2::connector::{BranchId, PortIdLocal}; + +/// A message in transit from one connector to another. +#[derive(Clone)] +pub struct Message { + pub sending_port: PortIdLocal, + pub receiving_port: PortIdLocal, + pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id + pub sender_cur_branch_id: BranchId, // always valid + pub message: ValueGroup, +} + +/// The inbox of a connector. The owning connector (i.e. the thread that is +/// executing the connector) should be able to read all messages. Other +/// connectors (potentially executed by different threads) should be able to +/// append messages. +/// +/// Note that the logic inside of the inbox is strongly connected to deciding +/// whether or not a connector has nothing to execute, and is waiting on new +/// messages in order to continue. +pub struct Inbox { + messages: Vec +} + +impl Inbox { + pub fn new() -> Self { + Self{ messages: Vec::new() } + } + + /// Will insert the message into the inbox. Only exception is when the tuple + /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then + /// nothing is inserted.. + pub fn insert_message(&mut self, message: Message) { + match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { + Ok(_) => {} // message already exists + Err(idx) => self.messages.insert(idx, message) + } + } + + /// Retrieves all messages for the provided conditions + pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { + // Seek the first message with the appropriate port ID and branch ID + let num_messages = self.messages.len(); + + for first_idx in 0..num_messages { + let msg = &self.messages[first_idx]; + if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { + // Found a match, seek ahead until the condition is no longer true + let mut last_idx = first_idx + 1; + while last_idx < num_messages { + let msg = &self.messages[last_idx]; + if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { + // No longer matching + break; + } + last_idx += 1; + } + + // Return all the matching messages + return &self.messages[first_idx..last_idx]; + } else if msg.receiving_port.id > port_id.id { + // Because messages are ordered, this implies we couldn't find + // any message + break; + } + } + + return &self.messages[0..0]; + } + + /// Simply empties the inbox + pub fn clear(&mut self) { + self.messages.clear(); + } + + // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur + // branch id. + fn compare_messages(a: &Message, b: &Message) -> Ordering { + let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); + if ord != Ordering::Equal { return ord; } + + ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); + if ord != Ordering::Equal { return ord; } + + return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); + } +} diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs index 05713bbcd5939ef446291eecd6e54aa05c1130dc..fde4cd812a4004c39cc59875c754e1ceb15a3a6c 100644 --- a/src/runtime2/messages.rs +++ b/src/runtime2/messages.rs @@ -1,13 +1,14 @@ -use std::collections::HashMap; -use std::collections::hash_map::Entry; use std::cmp::Ordering; +use std::collections::hash_map::Entry; +use std::collections::HashMap; -use super::connector::{PortIdLocal, BranchId}; -use crate::PortId; use crate::common::Id; +use crate::PortId; use crate::protocol::*; use crate::protocol::eval::*; +use super::connector::{BranchId, PortIdLocal}; + /// A message residing in a connector's inbox (waiting to be put into some kind /// of speculative branch), or a message waiting to be sent. #[derive(Clone)] @@ -19,83 +20,6 @@ pub struct BufferedMessage { pub(crate) message: ValueGroup, } -#[derive(Clone)] -pub struct Message { - pub sending_port: PortIdLocal, - pub receiving_port: PortIdLocal, - pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id - pub sender_cur_branch_id: BranchId, // always valid - pub message: ValueGroup, -} - -pub struct Inbox { - messages: Vec -} - -impl Inbox { - pub fn new() -> Self { - Self{ messages: Vec::new() } - } - - /// Will insert the message into the inbox. Only exception is when the tuple - /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then - /// nothing is inserted.. - pub fn insert_message(&mut self, message: Message) { - match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { - Ok(_) => {} // message already exists - Err(idx) => self.messages.insert(idx, message) - } - } - - /// Retrieves all messages for the provided conditions - pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { - // Seek the first message with the appropriate port ID and branch ID - let num_messages = self.messages.len(); - - for first_idx in 0..num_messages { - let msg = &self.messages[first_idx]; - if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { - // Found a match, seek ahead until the condition is no longer true - let mut last_idx = first_idx + 1; - while last_idx < num_messages { - let msg = &self.messages[last_idx]; - if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { - // No longer matching - break; - } - last_idx += 1; - } - - // Return all the matching messages - return &self.messages[first_idx..last_idx]; - } else if msg.receiving_port.id > port_id.id { - // Because messages are ordered, this implies we couldn't find - // any message - break; - } - } - - return &self.messages[0..0]; - } - - /// Simply empties the inbox - pub fn clear(&mut self) { - self.messages.clear(); - } - - // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur - // branch id. - fn compare_messages(a: &Message, b: &Message) -> Ordering { - let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); - if ord != Ordering::Equal { return ord; } - - ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); - if ord != Ordering::Equal { return ord; } - - return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); - } -} - /// A connector's global inbox. Any received message ends up here. This is /// because a message might be received before a branch arrives at the /// corresponding `get()` that is supposed to receive that message. Hence we diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 4eb25b841315eaafaa4f67f5c519212bc1067f9a..87580e13e35739b790cbf350113887171b757b3f 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -5,3 +5,4 @@ mod global_store; mod scheduler; #[cfg(test)] mod tests; +mod inbox; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e9d8a99c89c6b4df70ce847c352ab97d8de02f2e..b772b87602a537819130076dfe9751d213c33a2f 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -51,7 +51,11 @@ impl Scheduler { new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); - if !delta_state.outbox.is_empty() {} + if !delta_state.outbox.is_empty() { + for message in delta_state.outbox.drain(..) { + + } + } } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors