Changeset - 8c5d438b0fa3
[Not reviewed]
0 4 1
mh - 4 years ago 2021-10-01 17:59:31
contact@maxhenger.nl
rewriting inbox to behave mpsc-like
5 files changed with 103 insertions and 86 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 

	
 
use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value, Prompt};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{Inbox, Message};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub id: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ id: u32::MAX }
 
    }
 
}
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, Message>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            inbox: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
src/runtime2/inbox.rs
Show inline comments
 
new file 100644
 
use crate::common::Ordering;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::connector::{BranchId, PortIdLocal};
 

	
 
/// A message in transit from one connector to another.
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
/// The inbox of a connector. The owning connector (i.e. the thread that is
 
/// executing the connector) should be able to read all messages. Other
 
/// connectors (potentially executed by different threads) should be able to
 
/// append messages.
 
///
 
/// Note that the logic inside of the inbox is strongly connected to deciding
 
/// whether or not a connector has nothing to execute, and is waiting on new
 
/// messages in order to continue.
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
src/runtime2/messages.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 
use std::cmp::Ordering;
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use super::connector::{PortIdLocal, BranchId};
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::connector::{BranchId, PortIdLocal};
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
// TODO: @remove
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.receiving_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
 
        self.messages.pop()
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod global_store;
 
mod scheduler;
 

	
 
#[cfg(test)] mod tests;
 
mod inbox;
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::time::Duration;
 
use std::thread;
 
use crate::ProtocolDescription;
 

	
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 

	
 
struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            global,
 
            code,
 
        }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new()
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
            // Retrieve a unit of work
 
            let connector_key = self.global.pop_key();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                thread::sleep(Duration::new(1, 0));
 
                continue
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.get_connector(&connector_key);
 

	
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 

	
 
                if connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {}
 
                    if !delta_state.outbox.is_empty() {
 
                        for message in delta_state.outbox.drain(..) {
 

	
 
                        }
 
                    }
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                    if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 

	
 
                    }
 
                }
 

	
 
                cur_schedule = new_schedule;
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)