Changeset - 8c5d438b0fa3
[Not reviewed]
0 4 1
mh - 4 years ago 2021-10-01 17:59:31
contact@maxhenger.nl
rewriting inbox to behave mpsc-like
5 files changed with 103 insertions and 86 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 

	
 
use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value, Prompt};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{Inbox, Message};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
src/runtime2/inbox.rs
Show inline comments
 
new file 100644
 
use crate::common::Ordering;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::connector::{BranchId, PortIdLocal};
 

	
 
/// A message in transit from one connector to another.
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
/// The inbox of a connector. The owning connector (i.e. the thread that is
 
/// executing the connector) should be able to read all messages. Other
 
/// connectors (potentially executed by different threads) should be able to
 
/// append messages.
 
///
 
/// Note that the logic inside of the inbox is strongly connected to deciding
 
/// whether or not a connector has nothing to execute, and is waiting on new
 
/// messages in order to continue.
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
src/runtime2/messages.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 
use std::cmp::Ordering;
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use super::connector::{PortIdLocal, BranchId};
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::connector::{BranchId, PortIdLocal};
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
@@ -19,83 +20,6 @@ pub struct BufferedMessage {
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
src/runtime2/mod.rs
Show inline comments
 
@@ -5,3 +5,4 @@ mod global_store;
 
mod scheduler;
 

	
 
#[cfg(test)] mod tests;
 
mod inbox;
src/runtime2/scheduler.rs
Show inline comments
 
@@ -51,7 +51,11 @@ impl Scheduler {
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {}
 
                    if !delta_state.outbox.is_empty() {
 
                        for message in delta_state.outbox.drain(..) {
 

	
 
                        }
 
                    }
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
0 comments (0 inline, 0 general)