Changeset - f4f12a71e2e2
[Not reviewed]
0 4 1
mh - 4 years ago 2021-09-27 11:13:10
contact@maxhenger.nl
WIP on runtime with error handling and multithreading
5 files changed with 781 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 

	
 
use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub id: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ id: u32::MAX }
 
    }
 
}
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, Message>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            inbox: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
enum PortOwnershipDelta {
 
    TakeOwnership(PortIdLocal),
 
    GiveAwayOwnership(PortIdLocal),
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if port == port_id {
 
                return Some(idx)
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
impl BranchQueue {
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 
}
 

	
 
pub(crate) struct Connector {
 
    // State and properties of connector itself
 
    id: u32,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    ports: ConnectorPorts,
 
    inbox: Inbox,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            inbox: Inbox::new(),
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                self.branches.push(firing_branch);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    if !messages.is_empty() {
 
                        // TODO: If message contains ports, transfer ownership of port.
 
                        for message in messages {
 
                            // For each message, for the execution and feed it
 
                            // the provided message
 
                            let new_branch_index = self.branches.len() as u32;
 
                            let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                            self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 

	
 
                            let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                            port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                            debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                            new_branch.inbox.insert(local_port_id, message.clone());
 

	
 
                            // Schedule the new branch
 
                            debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                            Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                            self.branches.push(new_branch);
 
                        }
 

	
 
                        // Because we have new branches to run, schedule
 
                        // immediately
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and modify them to be
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = Message{
 
                        sending_port: local_port_id,
 
                        receiving_port: PortIdLocal::new_invalid(),
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                self.branches.push(first_sync_branch);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                let first_port_idx = results.ports.len();
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                for port
 
            }
 
        }
 

	
 
        ConnectorScheduling::NotNow // TODO: @Temp
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal helpers
 
    // -------------------------------------------------------------------------
 

	
 
    // Helpers for management of the branches and their internally stored
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    #[inline]
 
    fn pop_branch(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
        }
 

	
 
        return branch;
 
    }
 

	
 
    #[inline]
 
    fn push_branch_into_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
        }
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports
 

	
 
    /// Marks the ports as being "given away" (e.g. by sending a message over a
 
    /// channel, or by constructing a connector). Will return an error if the
 
    /// connector doesn't own the port in the first place.
 
    fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(in_sync == !branch.index.is_valid());
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned {
 
                        // Port is used in some kind of interaction. Cannot both
 
                        // give away the port and use it in an interaction
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id))
 
                    }
 

	
 
                    // Make sure it is not already given away
 
                    for delta in &branch.ports_delta {
 
                        match delta {
 
                            PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping
 
                            PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => {
 
                                if port_id == given_away_port_id {
 
                                    return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                                }
 
                            }
 
                        }
 
                    }
 

	
 
                    // We're fine, the port will be given away. Note that if we
 
                    // are not in sync mode, then we can simply remove the
 
                    // ownership immediately.
 
                    if in_sync {
 
                        branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id));
 
                    } else {
 

	
 
                    }
 
                },
 
                None => {
 
                    // We did not yet own the port, so we must have received it
 
                    // this round, and we're going to give it away again.
 
                    debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id)));
 
                    let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id);
 
                    for delta_idx in 0..branch.ports_delta.len() {
 
                        if branch.ports_delta[delta_idx] == delta_to_find {
 
                            branch.ports_delta.remove(delta_idx);
 
                            break;
 
                        }
 
                    }
 

	
 
                    // Note for programmers: the fact that the message that
 
                    // contains this port will end up at another connector will
 
                    // take care of its new ownership.
 
                }
 
            }
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Adopt ownership of the ports
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
struct RunDeltaState {
 
    outbox: Vec<Message>,
 
    ports: Vec<PortIdLocal>,
 
}
 

	
 
enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/messages.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 
use std::cmp::Ordering;
 

	
 
use super::connector::{PortIdLocal, BranchId};
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
// TODO: @remove
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.receiving_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
 
        self.messages.pop()
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
mod connector;
 

	
 
#[cfg(test)] mod tests;
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::messages::*;
 

	
 
#[derive(Debug)]
 
pub enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
pub(crate) struct PortDesc {
 
    id: u32,
 
    peer_id: u32,
 
    owning_connector_id: Option<u32>,
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
pub(crate) struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    pub(crate) branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            spec_branches_active: VecDeque::new(),
 
            spec_branches_pending_receive: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
    Finished, // branch (necessarily non-sync) that reached end of code
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct BranchPortDesc {
 
    last_registered_index: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchContext {
 
    just_called_did_put: bool,
 
    pending_channel: Option<(Value, Value)>,
 
}
 

	
 
pub(crate) struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    pub(crate) code_state: ComponentState,
 
    pub(crate) branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
    branch_context: BranchContext,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self {
 
        // We expect that the given branche's context is not halfway handling a
 
        // `put(...)` or a `channel x -> y` statement.
 
        debug_assert!(!branch_state.branch_context.just_called_did_put);
 
        debug_assert!(branch_state.branch_context.pending_channel.is_none());
 

	
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
enum Scheduling {
 
    Immediate,
 
    Later,
 
    NotNow,
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
    PortMapping(u32, u32), // particular port's mapped branch number
 
}
 

	
 
// Local solution of the connector
 
#[derive(Clone)]
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    port_mapping: HashMap<u32, Option<u32>>, // port IDs of the connector, mapped to their branch IDs (None for silent ports)
 
}
 

	
 
#[derive(Clone)]
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_constraints: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
pub struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    pub(crate) ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    pub(crate) connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
            connectors_active: VecDeque::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel that is not owned by any connector and returns its
 
    /// endpoints. The returned values are of the (putter port, getter port)
 
    /// respectively.
 
    pub fn add_channel(&mut self) -> (Value, Value) {
 
        let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None);
 
        return (
 
            port_value_from_id(None, put_id, true),
 
            port_value_from_id(None, get_id, false)
 
        );
 
    }
 

	
 
    pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> {
 
        use AddComponentError as ACE;
 
        use crate::runtime::error::AddComponentError as OldACE;
 

	
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
        // TODO: Remove this error enum translation. Note that for now this
 
        //  function forces port-only arguments
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct. At the same time
 
        // modify the port IDs such that they contain the ID of the connector
 
        // we're about the create.
 
        let component_id = self.generate_connector_id();
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(PortId(Id{
 
                        connector_id: component_id,
 
                        u32_suffix: port_id.0.u32_suffix,
 
                    }));
 
                },
 
                Value::Output(port_id) => {
 
                    if *polarity != Polarity::Putter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(PortId(Id{
 
                        connector_id: component_id,
 
                        u32_suffix: port_id.0.u32_suffix
 
                    }));
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component, and mark the ports as being owned by the
 
        // newly instantiated component
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        for port in &ports {
 
            let desc = self.ports.get_mut(port).unwrap();
 
            desc.owning_connector_id = Some(component_id);
 
        }
 

	
 
        self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector until it indicates we can run another
 
            // connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            let mut scheduling = Scheduling::Immediate;
 

	
 
            while scheduling == Scheduling::Immediate {
 
                scheduling = self.run_connector(next_id);
 
            }
 

	
 
            match scheduling {
 
                Scheduling::Immediate => unreachable!(),
 
                Scheduling::Later => self.connectors_active.push_back(next_id),
 
                Scheduling::NotNow => {},
 
            }
 

	
 
            // Deal with any outgoing messages and potential solutions
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_new_solutions(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    fn run_connector(&mut self, connector_id: u32) -> Scheduling {
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 

	
 
        if desc.in_sync {
 
            return self.run_connector_sync_mode(connector_id);
 
        } else {
 
            return self.run_connector_regular_mode(connector_id);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve connector and branch that is supposed to be run
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(desc.in_sync);
 
        debug_assert!(!desc.spec_branches_active.is_empty());
 

	
 
        let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch = &mut desc.branches[branch_index as usize];
 
        debug_assert_eq!(branch_index, branch.index);
 

	
 
        // Run this particular branch to a next blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 

	
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent. So we don't
 
                // run it again
 
                branch.branch_state = BranchState::Failed;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that did not have a
 
                // value assigned yet. So branch and keep running.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                let mut copied_branch = Self::duplicate_branch(desc, branch_index);
 
                let copied_index = copied_branch.index;
 

	
 
                copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 1,
 
                });
 

	
 
                let branch = &mut desc.branches[branch_index as usize]; // need to reborrow
 
                branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 0,
 
                });
 

	
 
                // Run both again
 
                desc.branches.push(copied_branch);
 
                desc.spec_branches_active.push_back(branch_index);
 
                desc.spec_branches_active.push_back(copied_index);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch just performed a `get()` on a port that did
 
                // not yet receive a value.
 

	
 
                // First check if a port value is assigned to the
 
                // current branch. If so, check if it is consistent.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                let mut insert_in_pending_receive = false;
 

	
 
                println!("DEBUG: Connector {} performing get on {:#?}", connector_id, port_id);
 

	
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet, so force to firing
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: None,
 
                            num_times_fired: 1,
 
                        });
 
                        branch.branch_state = BranchState::BranchPoint;
 
                        insert_in_pending_receive = true;
 
                    },
 
                    Entry::Occupied(entry) => {
 
                        // Have an entry, check if it is consistent
 
                        let entry = entry.get();
 
                        if entry.num_times_fired == 0 {
 
                            // Inconsistent
 
                            branch.branch_state = BranchState::Failed;
 
                        } else {
 
                            // Perfectly fine, add to queue
 
                            debug_assert!(entry.last_registered_index.is_none());
 
                            assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                            branch.branch_state = BranchState::BranchPoint;
 
                            insert_in_pending_receive = true;
 
                        }
 
                    }
 
                }
 

	
 
                println!("DEBUG: insert = {}, port mapping is now {:#?}", insert_in_pending_receive, &branch.port_mapping);
 

	
 
                if insert_in_pending_receive {
 
                    // Perform the insert
 
                    match desc.spec_branches_pending_receive.entry(port_id) {
 
                        Entry::Vacant(entry) => {
 
                            entry.insert(vec![branch_index]);
 
                        }
 
                        Entry::Occupied(mut entry) => {
 
                            let entry = entry.get_mut();
 
                            debug_assert!(!entry.contains(&branch_index));
 
                            entry.push(branch_index);
 
                        }
 
                    }
 

	
 
                    // But also check immediately if we don't have a
 
                    // previously received message. If so, we
 
                    // immediately branch and accept the message
 
                    if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                        for message in messages {
 
                            let mut new_branch = Self::duplicate_branch(desc, branch_index);
 
                            let new_branch_idx = new_branch.index;
 
                            let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                            new_port_desc.last_registered_index = Some(message.peer_cur_branch_id);
 
                            new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                            desc.branches.push(new_branch);
 
                            desc.spec_branches_active.push_back(new_branch_idx);
 
                        }
 

	
 
                        if !messages.is_empty() {
 
                            return Scheduling::Immediate;
 
                        }
 
                    }
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Check the branch for any ports that were not used and
 
                // insert them in the port mapping as not having fired.
 
                for port_id in branch.owned_ports.iter().copied() {
 
                    let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id });
 
                    if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                        entry.insert(BranchPortDesc {
 
                            last_registered_index: None,
 
                            num_times_fired: 0
 
                        });
 
                    }
 
                }
 

	
 
                // Mark the branch as being done
 
                branch.branch_state = BranchState::ReachedEndSync;
 
                desc.spec_branches_done.push(branch_index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                println!("DEBUG: Connector {} performing put on {:#?}", connector_id, port_id);
 
                let mut can_put = true;
 
                branch.branch_context.just_called_did_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: Some(branch.index),
 
                            num_times_fired: 1,
 
                        });
 
                    },
 
                    Entry::Occupied(mut entry) => {
 
                        // Pre-existing entry
 
                        let entry = entry.get_mut();
 
                        if entry.num_times_fired == 0 {
 
                            // This is 'fine' in the sense that we have
 
                            // a normal inconsistency in the branch.
 
                            branch.branch_state = BranchState::Failed;
 
                            can_put = false;
 
                        } else if entry.last_registered_index.is_none() {
 
                            // A put() that follows a fires()
 
                            entry.last_registered_index = Some(branch.index);
 
                        } else {
 
                            // This should be fine in the future. But
 
                            // for now we throw an error as it doesn't
 
                            // mesh well with the 'fires()' concept.
 
                            todo!("throw an error of some sort, then fail all related")
 
                        }
 
                    }
 
                }
 
                println!("DEBUG: can_put = {}, port mapping is now {:#?}", can_put, &branch.port_mapping);
 

	
 
                if can_put {
 
                    // Actually put the message in the outbox
 
                    let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
                    debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id);
 
                    let peer_id = port_desc.peer_id;
 
                    let peer_desc = self.ports.get(&peer_id).unwrap();
 
                    debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                    let peer_id = PortId(Id{
 
                        connector_id: peer_desc.owning_connector_id.unwrap(),
 
                        u32_suffix: peer_id
 
                    });
 

	
 
                    // For now this is the one and only time we're going
 
                    // to send a message. So for now we can't send a
 
                    // branch ID.
 
                    desc.global_outbox.insert_message(BufferedMessage{
 
                        sending_port: port_id,
 
                        receiving_port: peer_id,
 
                        peer_prev_branch_id: None,
 
                        peer_cur_branch_id: branch_index,
 
                        message: value_group,
 
                    });
 

	
 
                    // Finally, because we were able to put the message,
 
                    // we can run the branch again
 
                    desc.spec_branches_active.push_back(branch_index);
 
                    return Scheduling::Immediate;
 
                }
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
        }
 

	
 
        // Did not return that we need to immediately schedule again, so
 
        // determine if we want to do so based on the current number of active
 
        // speculative branches
 
        if desc.spec_branches_active.is_empty() {
 
            return Scheduling::NotNow;
 
        } else {
 
            return Scheduling::Later;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve the connector and the branch (which is always the first one,
 
        // since we assume we're not running in sync-mode).
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(!desc.in_sync);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        debug_assert_eq!(desc.branches.len(), 1);
 

	
 
        let branch = &mut desc.branches[0];
 

	
 
        // Run this branch to its blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.branch_state = BranchState::Finished;
 
                return Scheduling::NotNow
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution
 
                Self::prepare_branch_for_sync(desc);
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Find all references to ports in the provided arguments, the
 
                // ownership of these ports will be transferred to the connector
 
                // we're about to create.
 
                let mut ports = Vec::with_capacity(arguments.values.len());
 
                find_ports_in_value_group(&arguments, &mut ports);
 

	
 
                // Generate a new connector with its own state
 
                let new_component_id = self.generate_connector_id();
 
                let new_component_state = ComponentState {
 
                    prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                };
 

	
 
                for port_id in &ports {
 
                    let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                    debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id);
 
                    port.owning_connector_id = Some(new_component_id)
 
                }
 

	
 
                // Finally push the new connector into the registry
 
                let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                self.connectors_active.push_back(new_component_id);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewChannel => {
 
                // Prepare channel
 
                debug_assert!(run_context.branch_ctx.pending_channel.is_none());
 
                let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id));
 
                run_context.branch_ctx.pending_channel = Some((
 
                    port_value_from_id(Some(connector_id), put_id, true),
 
                    port_value_from_id(Some(connector_id), get_id, false)
 
                ));
 

	
 
                return Scheduling::Immediate;
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        loop {
 
            let connector = self.connectors.get_mut(&connector_index).unwrap();
 
            let message_to_send = connector.global_outbox.take_next_message_to_send();
 

	
 
            if message_to_send.is_none() {
 
                return;
 
            }
 

	
 
            // We have a message to send
 
            let message_to_send = message_to_send.unwrap();
 

	
 
            // Lookup the target connector
 
            let target_port = message_to_send.receiving_port;
 
            let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 

	
 
            // Check if there are any branches that are waiting on
 
            // receives
 
            if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) {
 
                // Check each of the branches for a port mapping that
 
                // matches the one on the message header
 
                for branch_index in branch_indices {
 
                    let branch = &mut target_connector.branches[*branch_index as usize];
 
                    debug_assert_eq!(branch.branch_state, BranchState::BranchPoint);
 

	
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                            can_branch = true;
 
                        }
 
                    }
 

	
 
                    if can_branch {
 
                        // Put the message inside a clone of the currently
 
                        // waiting branch
 
                        let mut new_branch = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch_idx = new_branch.index;
 
                        let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap();
 
                        new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id);
 
                        new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone());
 

	
 
                        // And queue the branch for further execution
 
                        target_connector.branches.push(new_branch);
 
                        target_connector.spec_branches_active.push_back(new_branch_idx);
 
                        if !self.connectors_active.contains(&target_connector.id) {
 
                            self.connectors_active.push_back(target_connector.id);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Checks a connector for the submitted solutions. After all neighbouring
 
    /// connectors have been checked all of their "last checked solution" index
 
    /// will be incremented.
 
    fn check_connector_new_solutions(&mut self, connector_id: u32) {
 
        // Take connector and start processing its solutions
 
        loop {
 
            let connector = self.connectors.get_mut(&connector_id).unwrap();
 
            if connector.last_checked_done == connector.spec_branches_done.len() as u32 {
 
                // Nothing to do
 
                return;
 
            }
 

	
 
            // We have a new solution
 
            let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize];
 
            connector.last_checked_done += 1;
 

	
 
            // Check the connector+branch combination to see if a global
 
            // solution has already been found
 
            if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) {
 
                // Found a global solution, apply it to all the connectors that
 
                // participate
 
                for (connector_id, local_solution) in global_solution.connector_mapping {
 
                    self.commit_connector_solution(connector_id, local_solution.final_branch_id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option<ProposedSolution> {
 
        // Take the connector and branch of interest
 
        let first_connector = self.connectors.get(&first_connector_index).unwrap();
 
        let first_branch = &first_connector.branches[first_branch_index as usize];
 
        debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync);
 

	
 
        // Setup the first solution
 
        let mut first_solution = ProposedSolution{
 
            connector_mapping: HashMap::new(),
 
            connector_constraints: HashMap::new(),
 
            remaining_connectors: Vec::new(),
 
        };
 
        let mut first_local_solution = ProposedConnectorSolution{
 
            final_branch_id: first_branch.index,
 
            all_branch_ids: Vec::new(),
 
            port_mapping: first_branch.port_mapping
 
                .iter()
 
                .map(|(port_id, port_info)| {
 
                    (port_id.0.u32_suffix, port_info.last_registered_index)
 
                })
 
                .collect(),
 
        };
 
        self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids);
 
        first_solution.connector_mapping.insert(first_connector.id, first_local_solution);
 

	
 
        for (port_id, port_mapping) in first_branch.port_mapping.iter() {
 
            let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let peer_port_desc = self.ports.get(&peer_port_id).unwrap();
 
            let peer_connector_id = peer_port_desc.owning_connector_id.unwrap();
 

	
 
            let constraint = match port_mapping.last_registered_index {
 
                Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id),
 
                None => ProposedBranchConstraint::SilentPort(peer_port_id),
 
            };
 

	
 
            match first_solution.connector_constraints.entry(peer_connector_id) {
 
                Entry::Vacant(entry) => {
 
                    // Not yet encountered
 
                    entry.insert(vec![constraint]);
 
                    first_solution.remaining_connectors.push(peer_connector_id);
 
                },
 
                Entry::Occupied(mut entry) => {
 
                    // Already encountered
 
                    let entry = entry.get_mut();
 
                    if !entry.contains(&constraint) {
 
                        entry.push(constraint);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Setup storage for all possible solutions
 
        let mut all_solutions = Vec::new();
 
        all_solutions.push(first_solution);
 

	
 
        while !all_solutions.is_empty() {
 
            let mut cur_solution = all_solutions.pop().unwrap();
 

	
 
            if cur_solution.remaining_connectors.is_empty() {
 
                // All connectors have been visited, so commit the solution
 
                debug_assert!(cur_solution.connector_constraints.is_empty());
 
                return Some(cur_solution);
 
            } else {
 
                // Not all connectors have been visited yet, so take one of the
 
                // connectors and visit it.
 
                let target_connector = cur_solution.remaining_connectors.pop().unwrap();
 
                self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector);
 
            }
 
        }
 

	
 
        // No satisfying solution found
 
        return None;
 
    }
 

	
 
    fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec<ProposedSolution>, target_connector: u32) {
 
        debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited
 
        debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it
 

	
 
        let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap();
 
        let cur_connector = self.connectors.get(&target_connector).unwrap();
 

	
 
        // Make sure all propositions are unique
 
        for i in 0..branch_constraints.len() {
 
            let proposition_i = branch_constraints[i];
 
            for j in 0..i {
 
                let proposition_j = branch_constraints[j];
 
                debug_assert_ne!(proposition_i, proposition_j);
 
            }
 
        }
 

	
 
        // Go through the current connector's branches that have finished
 
        'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() {
 
            let finished_branch = &cur_connector.branches[finished_branch_idx as usize];
 

	
 
            // Construct a list of all the parent branch numbers
 
            let mut parent_branch_ids = Vec::new();
 
            self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids);
 

	
 
            // Go through all constraints and make sure they are satisfied by
 
            // the current branch
 
            let mut all_constraints_satisfied = true;
 

	
 
            for constraint in branch_constraints {
 
                match constraint {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        // Specified should have remained silent
 
                        let port_id = PortId(Id{
 
                            connector_id: target_connector,
 
                            u32_suffix: *port_id,
 
                        });
 
                        debug_assert!(finished_branch.port_mapping.contains_key(&port_id));
 
                        let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap();
 
                        all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0;
 
                    },
 
                    ProposedBranchConstraint::BranchNumber(branch_id) => {
 
                        // Branch number should have appeared in the
 
                        // predecessor branches.
 
                        all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id);
 
                    },
 
                    ProposedBranchConstraint::PortMapping(port_id, branch_id) => {
 
                        // Port should map to a particular branch number
 
                        let port_id = PortId(Id{
 
                            connector_id: target_connector,
 
                            u32_suffix: *port_id,
 
                        });
 
                        debug_assert!(finished_branch.port_mapping.contains_key(&port_id));
 
                        let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap();
 
                        all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id);
 
                    }
 
                }
 

	
 
                if !all_constraints_satisfied {
 
                    break;
 
                }
 
            }
 

	
 
            if !all_constraints_satisfied {
 
                continue;
 
            }
 

	
 
            // If here, then all constraints on the finished branch are
 
            // satisfied. But the finished branch also puts constraints on the
 
            // other connectors. So either:
 
            // 1. Add them to the list of constraints a peer connector should
 
            //  adhere to.
 
            // 2. Make sure that the provided connector solution matches the
 
            //  constraints imposed by the currently considered finished branch
 
            //
 
            // To make our lives a bit easier we already insert our proposed
 
            // local solution into a prepared global solution. This makes
 
            // looking up remote ports easier (since the channel might have its
 
            // two ends owned by the same connector).
 
            let mut new_solution = cur_solution.clone();
 
            debug_assert!(!new_solution.remaining_connectors.contains(&target_connector));
 
            new_solution.connector_constraints.remove(&target_connector);
 
            new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{
 
                final_branch_id: finished_branch.index,
 
                all_branch_ids: parent_branch_ids,
 
                port_mapping: finished_branch.port_mapping
 
                    .iter()
 
                    .map(|(port_id, port_desc)| {
 
                        (port_id.0.u32_suffix, port_desc.last_registered_index)
 
                    })
 
                    .collect(),
 
            });
 

	
 
            for (local_port_id, port_desc) in &finished_branch.port_mapping {
 
                // Retrieve port of peer
 
                let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap();
 
                let peer_port_id = port_info.peer_id;
 
                let peer_port_info = self.ports.get(&peer_port_id).unwrap();
 
                let peer_connector_id = peer_port_info.owning_connector_id.unwrap();
 

	
 
                // If the connector was not present in the new global solution
 
                // yet, add it now, as it simplifies the following logic
 
                if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) {
 
                    new_solution.connector_constraints.insert(peer_connector_id, Vec::new());
 
                    new_solution.remaining_connectors.push(peer_connector_id);
 
                }
 

	
 
                if new_solution.remaining_connectors.contains(&peer_connector_id) {
 
                    // Constraint applies to a connector that has not yet been
 
                    // visited
 
                    debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id));
 
                    debug_assert_ne!(peer_connector_id, target_connector);
 

	
 
                    let new_constraint = if port_desc.num_times_fired == 0 {
 
                        ProposedBranchConstraint::SilentPort(peer_port_id)
 
                    } else if peer_port_info.is_getter {
 
                        // Peer port is a getter, so we want its port to map to
 
                        // the branch number in our port mapping.
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap())
 
                    } else {
 
                        // Peer port is a putter, so we want to restrict the
 
                        // solution's run to contain the branch ID we received.
 
                        ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap())
 
                    };
 

	
 
                    let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap();
 
                    if !peer_constraints.contains(&new_constraint) {
 
                        peer_constraints.push(new_constraint);
 
                    }
 
                } else {
 
                    // Constraint applies to an already visited connector
 
                    let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap();
 
                    if port_desc.num_times_fired == 0 {
 
                        let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap();
 
                        if peer_mapped_id.is_some() {
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    } else if peer_port_info.is_getter {
 
                        // Peer is getter, so its port should be mapped to one
 
                        // of our branch IDs. To simplify lookup we look at the
 
                        // last message we sent to the getter.
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        let peer_port = peer_solution.port_mapping.get(&peer_port_id)
 
                            .map_or(None, |v| *v);
 

	
 
                        if port_desc.last_registered_index != peer_port {
 
                            // No match
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    } else {
 
                        // Peer is putter, so we expect to find our port mapping
 
                        // to match one of the branch numbers in the peer
 
                        // connector's local solution
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        let expected_branch_id = port_desc.last_registered_index.unwrap();
 

	
 
                        if !peer_solution.all_branch_ids.contains(&expected_branch_id) {
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            if !all_constraints_satisfied {
 
                // Final checks failed
 
                continue 'branch_loop
 
            }
 

	
 
            // We're sure that this branch matches the provided solution, so
 
            // push it onto the list of considered solutions
 
            all_solutions.push(new_solution);
 
        }
 
    }
 

	
 
    fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) {
 
        // Retrieve connector and branch
 
        let connector = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch
 
        debug_assert!(connector.in_sync);
 
        debug_assert!(connector.spec_branches_done.contains(&branch_id));
 

	
 
        // Put the selected solution in front, the branch at index 0 is the
 
        // "non-sync" branch.
 
        connector.branches.swap(0, branch_id as usize);
 
        connector.branches.truncate(1);
 

	
 
        // And reset the connector's state for further execution
 
        connector.in_sync = false;
 
        connector.spec_branches_active.clear();
 
        connector.spec_branches_pending_receive.clear();
 
        connector.spec_branches_done.clear();
 
        connector.last_checked_done = 0;
 
        connector.global_inbox.clear();
 
        connector.global_outbox.clear();
 

	
 
        // Do the same thing for the final selected branch
 
        let final_branch = &mut connector.branches[0];
 
        final_branch.index = 0;
 
        final_branch.parent_index = None;
 
        debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync);
 
        final_branch.branch_state = BranchState::RunningNonSync;
 
        final_branch.message_inbox.clear();
 
        final_branch.port_mapping.clear();
 

	
 
        // Might be that the connector was no longer running, if so, put it back
 
        // in the list of connectors to run
 
        if !self.connectors_active.contains(&connector_id) {
 
            self.connectors_active.push_back(connector_id);
 
        }
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.connector_counter;
 
        self.connector_counter += 1;
 
        return id;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for port management
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn add_owned_channel(ports: &mut HashMap<u32, PortDesc>, port_counter: &mut u32, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = *port_counter;
 
        let put_id = *port_counter + 1;
 
        (*port_counter) += 2;
 

	
 
        ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for branch management
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a speculative branch for further execution from the connector's
 
    /// non-speculative base branch.
 
    fn prepare_branch_for_sync(desc: &mut ConnectorDesc) {
 
        // Ensure only one branch is active, the non-sync branch
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        let new_branch_index = 1;
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_branch_index);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns it. Due to
 
    /// borrowing rules in code that uses this helper the returned branch still
 
    /// needs to be pushed onto the member `branches`.
 
    fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch);
 

	
 
        return copied_branch;
 
    }
 

	
 
    /// Retrieves all parent IDs of a particular branch. These numbers run from
 
    /// the leaf towards the parent.
 
    fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec<u32>) {
 
        let mut next_branch_index = first_branch_index;
 
        result.clear();
 

	
 
        loop {
 
            result.push(next_branch_index);
 
            let branch = &desc.branches[next_branch_index as usize];
 

	
 
            match branch.parent_index {
 
                Some(index) => next_branch_index = index,
 
                None => return,
 
            }
 
        }
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Temporary references to branch related storage
 
    inbox: &'a HashMap<(PortId, u32), ValueGroup>,
 
    port_mapping: &'a HashMap<PortId, BranchPortDesc>,
 
    branch_ctx: &'a mut BranchContext,
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        let old_value = self.branch_ctx.just_called_did_put;
 
        self.branch_ctx.just_called_did_put = false;
 
        return old_value;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let inbox_key = (port, 1);
 
        match self.inbox.get(&inbox_key) {
 
            None => None,
 
            Some(value) => Some(value.clone()),
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self.port_mapping.get(&port) {
 
            None => None,
 
            Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)),
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.branch_ctx.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports.iter() {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 
                
 
                ports.push(*port_id);
 
            },
 
            Value::Array(heap_pos) | 
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 
    
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
fn port_value_from_id(connector_id: Option<u32>, port_id: u32, is_output: bool) -> Value {
 
    let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system
 
    if is_output {
 
        return Value::Output(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id
 
        }));
 
    } else {
 
        return Value::Input(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id,
 
        }));
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::runtime::*;
 
use crate::ProtocolDescription;
 
use crate::protocol::eval::*;
 

	
 
#[test]
 
fn test_single_message() {
 
    // Simple test were we have a `putter` component, which will simply send a
 
    // single message (a boolean), and a `getter` component, which will receive
 
    // that message.
 
    // We will write this behaviour in the various ways that the language
 
    // currently allows. We will cheat a bit by peeking into the runtime to make
 
    // sure that the getter actually received the message.
 
    // TODO: Expose ports to a "native application"
 

	
 
    fn check_store_bool(value: &Value, expected: bool) {
 
        if let Value::Bool(value) = value {
 
            assert_eq!(*value, expected);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 

	
 
    fn run_putter_getter(code: &[u8]) {
 
        // Compile code
 
        let pd = ProtocolDescription::parse(code)
 
            .expect("code successfully compiles");
 
        let pd = Arc::new(pd);
 

	
 
        // Construct runtime and the appropriate ports and connectors
 
        let mut rt = Runtime::new(pd);
 
        let (put_port, get_port) = rt.add_channel();
 

	
 
        let mut put_args = ValueGroup::new_stack(vec![
 
            put_port,
 
        ]);
 
        rt.add_component("", "putter", put_args)
 
            .expect("'putter' component created");
 

	
 
        let mut get_args = ValueGroup::new_stack(vec![
 
            get_port,
 
        ]);
 
        rt.add_component("", "getter", get_args)
 
            .expect("'getter' component created");
 

	
 
        // Run until completion
 
        rt.run();
 

	
 
        // Check for success (the 'received' and 'did_receive" flags)
 
        let getter_component = rt.connectors.get(&1).unwrap();
 
        let branch = &getter_component.branches[0];
 
        assert_eq!(branch.branch_state, BranchState::Finished);
 

	
 
        // Note: with the stack structure of the store, the first entry is the
 
        // "previous stack pos" and the second one is the input port passed to
 
        // the procedure. Hence the third/fourth entries are the boolean
 
        // variables on the stack.
 
        check_store_bool(&branch.code_state.prompt.store.stack[2], true);
 
        check_store_bool(&branch.code_state.prompt.store.stack[3], true);
 
    }
 

	
 
    // Without `fires()`, just a single valid behaviour
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                put(put_here, true);
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false;
 
            bool did_receive = false;
 

	
 
            synchronous {
 
                received = get(get_here);
 
                if (received) {
 
                    print(\"value was 'true'\");
 
                } else {
 
                    print(\"value was 'false'\");
 
                }
 
                did_receive = true;
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the putter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (!fires(put_here)) {
 
                    assert(false);
 
                } else {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                }
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the getter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (fires(put_here)) {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                } else {
 
                    assert(false);
 
                }
 
            }
 
        }"
 
    );
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)