Changeset - ff6ade8b8097
[Not reviewed]
0 6 1
MH - 4 years ago 2021-09-21 17:54:29
contact@maxhenger.nl
WIP on runtime without compiler errors
7 files changed with 759 insertions and 436 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -584,10 +584,12 @@ impl Prompt {
 
                                        unreachable!("executor calling 'get' on value {:?}", value)
 
                                    };
 

	
 
                                    match ctx.get(port_id, &mut self.store) {
 
                                    match ctx.get(port_id) {
 
                                        Some(result) => {
 
                                            // We have the result
 
                                            cur_frame.expr_values.push_back(result)
 
                                            // We have the result. Merge the `ValueGroup` with the
 
                                            // stack/heap storage.
 
                                            debug_assert_eq!(result.values.len(), 1);
 
                                            result.into_stack(&mut cur_frame.expr_values, &mut self.store);
 
                                        },
 
                                        None => {
 
                                            // Don't have the result yet, prepare the expression to
 
@@ -639,7 +641,7 @@ impl Prompt {
 
                                    let port_id = match port_value_deref {
 
                                        Value::Input(port_id) => port_id,
 
                                        Value::Output(port_id) => port_id,
 
                                        _ => unreachable!("executor calling 'fires' on value {:?}", value),
 
                                        _ => unreachable!("executor calling 'fires' on value {:?}", port_value_deref),
 
                                    };
 

	
 
                                    match ctx.fires(port_id) {
src/protocol/eval/value.rs
Show inline comments
 
use std::collections::VecDeque;
 

	
 
use super::store::*;
 
use crate::PortId;
 
@@ -160,7 +161,7 @@ impl Value {
 
///
 
/// Again: this is a temporary thing, hopefully removed once we move to a
 
/// bytecode interpreter.
 
#[derive(Clone)]
 
#[derive(Clone, Debug)]
 
pub struct ValueGroup {
 
    pub(crate) values: Vec<Value>,
 
    pub(crate) regions: Vec<Vec<Value>>
 
@@ -230,6 +231,17 @@ impl ValueGroup {
 
        }
 
    }
 

	
 
    /// Transfers the heap values into the store, but will put the stack values
 
    /// into the provided `VecDeque`. This is mainly used to merge `ValueGroup`
 
    /// instances retrieved by the code by `get` calls into the expression
 
    /// stack.
 
    pub(crate) fn into_stack(self, stack: &mut VecDeque<Value>, store: &mut Store) {
 
        for value in &self.values {
 
            let transferred = self.provide_value(value, store);
 
            stack.push_back(transferred);
 
        }
 
    }
 

	
 
    fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value {
 
        if let Some(from_heap_pos) = value.get_heap_pos() {
 
            let from_heap_pos = from_heap_pos as usize;
src/protocol/mod.rs
Show inline comments
 
@@ -165,10 +165,10 @@ impl ProtocolDescription {
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&self, port: PortId) -> bool;
 
    fn get(&self, port: PortId) -> Option<Value>; // None if still waiting on message
 
    fn fires(&self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&self) -> Option<(Value, Value)>; // None if not yet prepared
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
@@ -207,7 +207,7 @@ impl ComponentState {
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, arg),
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
 
                        return RR::NewChannel,
 
                    EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id),
 
@@ -271,7 +271,14 @@ impl ComponentState {
 
                        context.new_component(moved_ports, init_state);
 
                        // Continue stepping
 
                        continue;
 
                    }
 
                    },
 
                    EvalContinuation::NewChannel => {
 
                        // Because of the way we emulate the old context for now, we can safely
 
                        // assume that this will never happen. The old context thingamajig always
 
                        // creates a channel, it never bubbles a "need to create a channel" message
 
                        // to the runtime
 
                        unreachable!();
 
                    },
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(_) => unreachable!(),
 
                    EvalContinuation::BlockGet(_) => unreachable!(),
 
@@ -304,23 +311,12 @@ impl ComponentState {
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(port) => {
 
                            return SyncBlocker::CouldntCheckFiring(port);
 
                        }
 
                        Value::Input(port) => {
 
                            return SyncBlocker::CouldntCheckFiring(port);
 
                        }
 
                        _ => unreachable!(),
 
                    EvalContinuation::NewChannel => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        return SyncBlocker::CouldntCheckFiring(port);
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(port) => {
 
                            return SyncBlocker::CouldntReadMsg(port);
 
                        }
 
                        Value::Input(port) => {
 
                            return SyncBlocker::CouldntReadMsg(port);
 
                        }
 
                        _ => unreachable!(),
 
                    EvalContinuation::BlockGet(port) => {
 
                        return SyncBlocker::CouldntReadMsg(port);
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let payload;
 
@@ -346,6 +342,72 @@ impl ComponentState {
 
        }
 
    }
 
}
 

	
 
impl RunContext for EvalContext<'_> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                ctx.did_put_or_get(port)
 
            }
 
        }
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(ctx) => {
 
                let payload = ctx.read_msg(port);
 
                if payload.is_none() {
 
                    return None;
 
                }
 

	
 
                let payload = payload.unwrap();
 
                let mut transformed = Vec::with_capacity(payload.len());
 
                for byte in payload.0.iter() {
 
                    transformed.push(Value::UInt8(*byte));
 
                }
 

	
 
                let value_group = ValueGroup{
 
                    values: vec![Value::Message(0)],
 
                    regions: vec![transformed],
 
                };
 

	
 
                return Some(value_group);
 
            }
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => {
 
                match context.is_firing(port) {
 
                    Some(did_fire) => Some(Value::Bool(did_fire)),
 
                    None => None,
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_port_pair();
 
                let from = Value::Output(from);
 
                let to = Value::Input(to);
 
                return Some((from, to));
 
            },
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
}
 

	
 
// TODO: @remove once old runtime has disappeared
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
src/runtime2/messages.rs
Show inline comments
 
@@ -85,6 +85,10 @@ impl ConnectorInbox {
 
            None => return None,
 
        }
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
@@ -92,14 +96,12 @@ impl ConnectorInbox {
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
    sent_counter: usize,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            sent_counter: 0,
 
        }
 
    }
 

	
 
@@ -120,18 +122,11 @@ impl ConnectorOutbox {
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> {
 
        if self.sent_counter == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let cur_index = self.sent_counter;
 
        self.sent_counter += 1;
 
        return Some(&self.messages[cur_index]);
 
    pub fn take_next_message_to_send(&mut self) -> Option<BufferedMessage> {
 
        self.messages.pop()
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
        self.sent_counter = 0;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
\ No newline at end of file
 
mod messages;
 

	
 
#[cfg(test)] mod tests;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, HashSet, VecDeque};
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
@@ -7,7 +7,6 @@ use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 
use super::messages::*;
 

	
 
enum AddComponentError {
 
@@ -27,7 +26,6 @@ struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    branch_id_counter: u32,
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
@@ -47,7 +45,6 @@ impl ConnectorDesc {
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            branch_id_counter: 1,
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
@@ -58,6 +55,7 @@ impl ConnectorDesc {
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
@@ -66,15 +64,15 @@ enum BranchState {
 
    Failed, // branch that became inconsistent
 
}
 

	
 
#[derive(Clone)]
 
struct BranchPortDesc {
 
    last_registered_identifier: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    last_registered_index: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    identifier: u32,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
@@ -88,7 +86,6 @@ impl BranchDesc {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            identifier: 0,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
@@ -100,11 +97,10 @@ impl BranchDesc {
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self {
 
    fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self {
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            identifier,
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
@@ -114,75 +110,42 @@ impl BranchDesc {
 
    }
 
}
 

	
 
// Separate from Runtime for borrowing reasons
 
struct Registry {
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
#[derive(PartialEq, Eq)]
 
enum Scheduling {
 
    Immediate,
 
    Later,
 
    NotNow,
 
}
 

	
 
impl Registry {
 
    fn new() -> Self {
 
        Self{
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
        }
 
    }
 

	
 
    /// Returns (putter_port, getter_port)
 
    pub fn add_channel(&mut self, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = self.generate_port_id();
 
        let put_id = self.generate_port_id();
 

	
 
        self.ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        self.ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    fn generate_port_id(&mut self) -> u32 {
 
        let id = self.port_counter;
 
        self.port_counter += 1;
 
        return id;
 
    }
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq)]
 
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
    PortMapping(u32, u32), // particular port's mapped branch number
 
}
 

	
 
// Local solution of the connector
 
#[derive(Clone)]
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    silent_ports: Vec<u32>, // port IDs of the connector itself
 
    port_mapping: HashMap<u32, Option<u32>>, // port IDs of the connector, mapped to their branch IDs (None for silent ports)
 
}
 

	
 
#[derive(Clone)]
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_propositions: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    connector_constraints: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    registry: Registry,
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
@@ -190,7 +153,10 @@ impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            registry: Registry::new(),
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
            connectors_active: VecDeque::new(),
 
        }
 
    }
 
@@ -199,7 +165,7 @@ impl Runtime {
 
    /// endpoints. The returned values are of the (putter port, getter port)
 
    /// respectively.
 
    pub fn add_channel(&mut self) -> (Value, Value) {
 
        let (put_id, get_id) = self.registry.add_channel(None);
 
        let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None);
 
        return (
 
            port_value_from_id(None, put_id, true),
 
            port_value_from_id(None, get_id, false)
 
@@ -255,7 +221,7 @@ impl Runtime {
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
@@ -264,16 +230,24 @@ impl Runtime {
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector
 
            // Run a single connector until it indicates we can run another
 
            // connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            let run_again = self.run_connector(next_id);
 
            let mut scheduling = Scheduling::Immediate;
 

	
 
            while scheduling == Scheduling::Immediate {
 
                scheduling = self.run_connector(next_id);
 
            }
 

	
 
            if run_again {
 
                self.connectors_active.push_back(next_id);
 
            match scheduling {
 
                Scheduling::Immediate => unreachable!(),
 
                Scheduling::Later => self.connectors_active.push_back(next_id),
 
                Scheduling::NotNow => {},
 
            }
 

	
 
            // Deal with any outgoing messages and potential solutions
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_solution(next_id);
 
            self.check_connector_new_solutions(next_id);
 
        }
 
    }
 

	
 
@@ -281,271 +255,327 @@ impl Runtime {
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, id: u32) -> bool {
 
        let desc = self.registry.connectors.get_mut(&id).unwrap();
 
        let mut run_context = Context{
 
            connector_id: id,
 
            branch_id: None,
 
            pending_channel: None,
 
        };
 
    pub fn run_connector(&mut self, connector_id: u32) -> Scheduling {
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 

	
 
        let mut call_again = false; // TODO: Come back to this, silly pattern
 
        if desc.in_sync {
 
            return self.run_connector_sync_mode(connector_id);
 
        } else {
 
            return self.run_connector_regular_mode(connector_id);
 
        }
 
    }
 

	
 
        while call_again {
 
            call_again = false; // bit of a silly pattern, maybe revise
 
    #[inline]
 
    fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve connector and branch that is supposed to be run
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(desc.in_sync);
 
        debug_assert!(!desc.spec_branches_active.is_empty());
 

	
 
            if desc.in_sync {
 
                // Running in synchronous mode, so run all branches until their
 
                // blocking point
 
                debug_assert!(!desc.spec_branches_active.is_empty());
 
                let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch = &mut desc.branches[branch_index as usize];
 

	
 
                let branch = &mut desc.branches[branch_index as usize];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 
        // Run this particular branch to a next blocking point
 
        // TODO: PERSISTENT RUN CTX
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: Some(branch_index),
 
            just_called_did_put: false,
 
            pending_channel: None,
 
        };
 

	
 
                match run_result {
 
                    RunResult::BranchInconsistent => {
 
                        // Speculative branch became inconsistent. So we don't
 
                        // run it again
 
                        branch.branch_state = BranchState::Failed;
 
                    },
 
                    RunResult::BranchMissingPortState(port_id) => {
 
                        // Branch called `fires()` on a port that did not have a
 
                        // value assigned yet. So branch and keep running
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                        let copied_index = Self::duplicate_branch(desc, branch_index);
 

	
 
                        // Need to re-borrow to assign changed port state
 
                        let original_branch = &mut desc.branches[branch_index as usize];
 
                        original_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 0,
 
                        });
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                        let copied_branch = &mut desc.branches[copied_index as usize];
 
                        copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent. So we don't
 
                // run it again
 
                branch.branch_state = BranchState::Failed;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that did not have a
 
                // value assigned yet. So branch and keep running.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                let mut copied_branch = Self::duplicate_branch(desc, branch_index);
 
                let copied_index = copied_branch.index;
 

	
 
                copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 1,
 
                });
 

	
 
                let branch = &mut desc.branches[branch_index as usize]; // need to reborrow
 
                branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 0,
 
                });
 

	
 
                // Run both again
 
                desc.branches.push(copied_branch);
 
                desc.spec_branches_active.push_back(branch_index);
 
                desc.spec_branches_active.push_back(copied_index);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch just performed a `get()` on a port that did
 
                // not yet receive a value.
 

	
 
                // First check if a port value is assigned to the
 
                // current branch. If so, check if it is consistent.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                let mut insert_in_pending_receive = false;
 

	
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet, so force to firing
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: None,
 
                            num_times_fired: 1,
 
                        });
 

	
 
                        // Run both again
 
                        desc.spec_branches_active.push_back(branch_index);
 
                        desc.spec_branches_active.push_back(copied_index);
 
                        branch.branch_state = BranchState::BranchPoint;
 
                        insert_in_pending_receive = true;
 
                    },
 
                    RunResult::BranchMissingPortValue(port_id) => {
 
                        // Branch just performed a `get()` on a port that did
 
                        // not yet receive a value.
 

	
 
                        // First check if a port value is assigned to the
 
                        // current branch. If so, check if it is consistent.
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        let mut insert_in_pending_receive = false;
 

	
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet, so force to firing
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 1,
 
                                });
 
                                branch.branch_state = BranchState::BranchPoint;
 
                                insert_in_pending_receive = true;
 
                            },
 
                            Entry::Occupied(entry) => {
 
                                // Have an entry, check if it is consistent
 
                                let entry = entry.get();
 
                                if entry.num_times_fired == 0 {
 
                                    // Inconsistent
 
                                    branch.branch_state = BranchState::Failed;
 
                                } else {
 
                                    // Perfectly fine, add to queue
 
                                    debug_assert!(entry.last_registered_identifier.is_none());
 
                                    assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                                    branch.branch_state = BranchState::BranchPoint;
 
                                    insert_in_pending_receive = true;
 
                                }
 
                            }
 
                    Entry::Occupied(entry) => {
 
                        // Have an entry, check if it is consistent
 
                        let entry = entry.get();
 
                        if entry.num_times_fired == 0 {
 
                            // Inconsistent
 
                            branch.branch_state = BranchState::Failed;
 
                        } else {
 
                            // Perfectly fine, add to queue
 
                            debug_assert!(entry.last_registered_index.is_none());
 
                            assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                            branch.branch_state = BranchState::BranchPoint;
 
                            insert_in_pending_receive = true;
 
                        }
 
                    }
 
                }
 

	
 
                        if insert_in_pending_receive {
 
                            // Perform the insert
 
                            match desc.spec_branches_pending_receive.entry(port_id) {
 
                                Entry::Vacant(entry) => {
 
                                    entry.insert(vec![branch_index]);
 
                                }
 
                                Entry::Occupied(mut entry) => {
 
                                    let entry = entry.get_mut();
 
                                    debug_assert!(!entry.contains(&branch_index));
 
                                    entry.push(branch_index);
 
                                }
 
                            }
 

	
 
                            // But also check immediately if we don't have a
 
                            // previously received message. If so, we
 
                            // immediately branch and accept the message
 
                            if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                                for message in messages {
 
                                    let new_branch_idx = Self::duplicate_branch(desc, branch_index);
 
                                    let new_branch = &mut desc.branches[new_branch_idx as usize];
 
                                    let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                                    new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id);
 
                                    new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                                    desc.spec_branches_active.push_back(new_branch_idx);
 
                                }
 
                            }
 
                if insert_in_pending_receive {
 
                    // Perform the insert
 
                    match desc.spec_branches_pending_receive.entry(port_id) {
 
                        Entry::Vacant(entry) => {
 
                            entry.insert(vec![branch_index]);
 
                        }
 
                    },
 
                    RunResult::BranchAtSyncEnd => {
 
                        // Check the branch for any ports that were not used and
 
                        // insert them in the port mapping as not having fired.
 
                        for port_index in branch.owned_ports {
 
                            let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                            if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                                entry.insert(BranchPortDesc {
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 0
 
                                });
 
                            }
 
                        Entry::Occupied(mut entry) => {
 
                            let entry = entry.get_mut();
 
                            debug_assert!(!entry.contains(&branch_index));
 
                            entry.push(branch_index);
 
                        }
 
                    }
 

	
 
                        // Mark the branch as being done
 
                        branch.branch_state = BranchState::ReachedEndSync;
 
                        desc.spec_branches_done.push(branch_index);
 
                    },
 
                    RunResult::BranchPut(port_id, value_group) => {
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                        // Branch just performed a `put()`. Check if we have
 
                        // assigned the port value and if so, if it is
 
                        // consistent.
 
                        let mut can_put = true;
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: Some(branch.identifier),
 
                                    num_times_fired: 1,
 
                                });
 
                            },
 
                            Entry::Occupied(mut entry) => {
 
                                // Pre-existing entry
 
                                let entry = entry.get_mut();
 
                                if entry.num_times_fired == 0 {
 
                                    // This is 'fine' in the sense that we have
 
                                    // a normal inconsistency in the branch.
 
                                    branch.branch_state = BranchState::Failed;
 
                                    can_put = false;
 
                                } else if entry.last_registered_identifier.is_none() {
 
                                    // A put() that follows a fires()
 
                                    entry.last_registered_identifier = Some(branch.identifier);
 
                                } else {
 
                                    // This should be fine in the future. But
 
                                    // for now we throw an error as it doesn't
 
                                    // mesh well with the 'fires()' concept.
 
                                    todo!("throw an error of some sort, then fail all related")
 
                                }
 
                            }
 
                    // But also check immediately if we don't have a
 
                    // previously received message. If so, we
 
                    // immediately branch and accept the message
 
                    if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                        for message in messages {
 
                            let mut new_branch = Self::duplicate_branch(desc, branch_index);
 
                            let new_branch_idx = new_branch.index;
 
                            let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                            new_port_desc.last_registered_index = Some(message.peer_cur_branch_id);
 
                            new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                            desc.branches.push(new_branch);
 
                            desc.spec_branches_active.push_back(new_branch_idx);
 
                        }
 

	
 
                        if can_put {
 
                            // Actually put the message in the outbox
 
                            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
                            let peer_id = port_desc.peer_id;
 
                            let peer_desc = self.registry.ports.get(&peer_id).unwrap();
 
                            debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                            let peer_id = PortId(Id{
 
                                connector_id: peer_desc.owning_connector_id.unwrap(),
 
                                u32_suffix: peer_id
 
                            });
 

	
 
                            // For now this is the one and only time we're going
 
                            // to send a message. So for now we can't send a
 
                            // branch ID.
 
                            desc.global_outbox.insert((port_id, 1), BufferedMessage{
 
                                sending_port: port_id,
 
                                receiving_port: peer_id,
 
                                peer_prev_branch_id: None,
 
                                peer_cur_branch_id: 0,
 
                                message: value_group,
 
                            });
 

	
 
                            // Finally, because we were able to put the message,
 
                            // we can run the branch again
 
                            desc.spec_branches_active.push_back(branch_index);
 
                            call_again = true;
 
                        if !messages.is_empty() {
 
                            return Scheduling::Immediate;
 
                        }
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
                    }
 
                }
 
            } else {
 
                // Running in non-synchronous mode
 
                let branch = &mut desc.branches[0];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::ComponentTerminated => return false,
 
                    RunResult::ComponentAtSyncStart => {
 
                        // Prepare for sync execution
 
                        Self::prepare_branch_for_sync(desc);
 
                        call_again = true;
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Check the branch for any ports that were not used and
 
                // insert them in the port mapping as not having fired.
 
                for port_index in branch.owned_ports.iter().copied() {
 
                    let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                    if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                        entry.insert(BranchPortDesc {
 
                            last_registered_index: None,
 
                            num_times_fired: 0
 
                        });
 
                    }
 
                }
 

	
 
                // Mark the branch as being done
 
                branch.branch_state = BranchState::ReachedEndSync;
 
                desc.spec_branches_done.push(branch_index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                let mut can_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: Some(branch.index),
 
                            num_times_fired: 1,
 
                        });
 
                    },
 
                    RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                        // Generate a new connector with its own state
 
                        let new_component_id = self.generate_connector_id();
 
                        let new_component_state = ComponentState {
 
                            prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                        };
 

	
 
                        // Transfer the ownership of any ports to the new connector
 
                        let mut ports = Vec::with_capacity(arguments.values.len());
 
                        find_ports_in_value_group(&arguments, &mut ports);
 
                        for port_id in &ports {
 
                            let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                            debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id);
 
                            port.owning_connector_id = Some(new_component_id)
 
                    Entry::Occupied(mut entry) => {
 
                        // Pre-existing entry
 
                        let entry = entry.get_mut();
 
                        if entry.num_times_fired == 0 {
 
                            // This is 'fine' in the sense that we have
 
                            // a normal inconsistency in the branch.
 
                            branch.branch_state = BranchState::Failed;
 
                            can_put = false;
 
                        } else if entry.last_registered_index.is_none() {
 
                            // A put() that follows a fires()
 
                            entry.last_registered_index = Some(branch.index);
 
                        } else {
 
                            // This should be fine in the future. But
 
                            // for now we throw an error as it doesn't
 
                            // mesh well with the 'fires()' concept.
 
                            todo!("throw an error of some sort, then fail all related")
 
                        }
 
                    }
 
                }
 

	
 
                        // Finally push the new connector into the registry
 
                        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                        self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                        self.connectors_active.push_back(new_component_id);
 
                    },
 
                    RunResult::NewChannel => {
 
                        // Prepare channel
 
                        debug_assert!(run_context.pending_channel.is_none());
 
                        let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id));
 
                        run_context.pending_channel = Some((
 
                            port_value_from_id(Some(run_context.connector_id), put_id, true),
 
                            port_value_from_id(Some(run_context.connector_id), get_id, false)
 
                        ));
 

	
 
                        // Call again so it is retrieved from the context
 
                        call_again = true;
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
                if can_put {
 
                    // Actually put the message in the outbox
 
                    let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
                    let peer_id = port_desc.peer_id;
 
                    let peer_desc = self.ports.get(&peer_id).unwrap();
 
                    debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                    let peer_id = PortId(Id{
 
                        connector_id: peer_desc.owning_connector_id.unwrap(),
 
                        u32_suffix: peer_id
 
                    });
 

	
 
                    // For now this is the one and only time we're going
 
                    // to send a message. So for now we can't send a
 
                    // branch ID.
 
                    desc.global_outbox.insert_message(BufferedMessage{
 
                        sending_port: port_id,
 
                        receiving_port: peer_id,
 
                        peer_prev_branch_id: None,
 
                        peer_cur_branch_id: 0,
 
                        message: value_group,
 
                    });
 

	
 
                    // Finally, because we were able to put the message,
 
                    // we can run the branch again
 
                    desc.spec_branches_active.push_back(branch_index);
 
                    return Scheduling::Immediate;
 
                }
 
            }
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
        }
 

	
 
        return true;
 
        // Did not return that we need to immediately schedule again, so
 
        // determine if we want to do so based on the current number of active
 
        // speculative branches
 
        if desc.spec_branches_active.is_empty() {
 
            return Scheduling::NotNow;
 
        } else {
 
            return Scheduling::Later;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve the connector and the branch (which is always the first one,
 
        // since we assume we're not running in sync-mode).
 
        // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 

	
 
        let branch = &mut desc.branches[0];
 

	
 
        // Run this branch to its blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: None,
 
            just_called_did_put: false,
 
            pending_channel: None,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => return Scheduling::NotNow,
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution
 
                Self::prepare_branch_for_sync(desc);
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Find all references to ports in the provided arguments, the
 
                // ownership of these ports will be transferred to the connector
 
                // we're about to create.
 
                let mut ports = Vec::with_capacity(arguments.values.len());
 
                find_ports_in_value_group(&arguments, &mut ports);
 

	
 
                // Generate a new connector with its own state
 
                let new_component_id = self.generate_connector_id();
 
                let new_component_state = ComponentState {
 
                    prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                };
 

	
 
                for port_id in &ports {
 
                    let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                    debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id);
 
                    port.owning_connector_id = Some(new_component_id)
 
                }
 

	
 
                // Finally push the new connector into the registry
 
                let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                self.connectors_active.push_back(new_component_id);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewChannel => {
 
                // Prepare channel
 
                debug_assert!(run_context.pending_channel.is_none());
 
                let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id));
 
                run_context.pending_channel = Some((
 
                    port_value_from_id(Some(connector_id), put_id, true),
 
                    port_value_from_id(Some(connector_id), get_id, false)
 
                ));
 

	
 
                return Scheduling::Immediate;
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() {
 
        loop {
 
            let connector = self.connectors.get_mut(&connector_index).unwrap();
 
            let message_to_send = connector.global_outbox.take_next_message_to_send();
 

	
 
            if message_to_send.is_none() {
 
                return;
 
            }
 

	
 
            // We have a message to send
 
            let message_to_send = message_to_send.unwrap();
 

	
 
            // Lookup the target connector
 
            let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap();
 
            let target_port = message_to_send.receiving_port;
 
            let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap();
 
            let target_connector = self.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 
@@ -562,7 +592,7 @@ impl Runtime {
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                        if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                            can_branch = true;
 
                        }
 
                    }
 
@@ -570,14 +600,15 @@ impl Runtime {
 
                    if can_branch {
 
                        // Put the message inside a clone of the currently
 
                        // waiting branch
 
                        let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch = &mut target_connector.branches[new_branch_idx as usize];
 
                        let mut new_branch = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch_idx = new_branch.index;
 
                        let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap();
 
                        new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id);
 
                        new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id);
 
                        new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone());
 

	
 
                        // And queue the branch for further execution
 
                        target_connector.spec_branches_active.push(new_branch_idx);
 
                        target_connector.branches.push(new_branch);
 
                        target_connector.spec_branches_active.push_back(new_branch_idx);
 
                        if !self.connectors_active.contains(&target_connector.id) {
 
                            self.connectors_active.push_back(target_connector.id);
 
                        }
 
@@ -590,55 +621,68 @@ impl Runtime {
 
    /// Checks a connector for the submitted solutions. After all neighbouring
 
    /// connectors have been checked all of their "last checked solution" index
 
    /// will be incremented.
 
    fn check_connector_new_solutions(&mut self, connector_index: u32) {
 
    fn check_connector_new_solutions(&mut self, connector_id: u32) {
 
        // Take connector and start processing its solutions
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        let mut considered_connectors = HashSet::new();
 
        let mut valid_solutions = Vec::new();
 
        loop {
 
            let connector = self.connectors.get_mut(&connector_id).unwrap();
 
            if connector.last_checked_done == connector.spec_branches_done.len() as u32 {
 
                // Nothing to do
 
                return;
 
            }
 

	
 
        while connector.last_checked_done != connector.spec_branches_done.len() as u32 {
 
            // We have a new solution to consider
 
            // We have a new solution
 
            let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize];
 
            connector.last_checked_done += 1;
 

	
 
            let branch = &connector.branches[start_branch_index as usize];
 
            debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync);
 

	
 
            // Clear storage for potential solutions
 
            considered_connectors.clear();
 

	
 
            // Start seeking solution among other connectors within the same
 
            // synchronous region
 
            considered_connectors.insert(connector.id);
 
            for port in branch.port_
 
            // Check the connector+branch combination to see if a global
 
            // solution has already been found
 
            if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) {
 
                // Found a global solution, apply it to all the connectors that
 
                // participate
 
                for (connector_id, local_solution) in global_solution.connector_mapping {
 
                    self.commit_connector_solution(connector_id, local_solution.final_branch_id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) {
 
    fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option<ProposedSolution> {
 
        // Take the connector and branch of interest
 
        let first_connector = self.registry.connectors.get(&first_connector_index).unwrap();
 
        let first_connector = self.connectors.get(&first_connector_index).unwrap();
 
        let first_branch = &first_connector.branches[first_branch_index as usize];
 
        debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync);
 

	
 
        // Setup the first solution
 
        let mut first_solution = ProposedSolution{
 
            connector_mapping: HashMap::new(),
 
            connector_propositions: HashMap::new(),
 
            connector_constraints: HashMap::new(),
 
            remaining_connectors: Vec::new(),
 
        };
 
        first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier);
 
        let mut first_local_solution = ProposedConnectorSolution{
 
            final_branch_id: first_branch.index,
 
            all_branch_ids: Vec::new(),
 
            port_mapping: first_branch.port_mapping
 
                .iter()
 
                .map(|(port_id, port_info)| {
 
                    (port_id.0.u32_suffix, port_info.last_registered_index)
 
                })
 
                .collect(),
 
        };
 
        self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids);
 
        first_solution.connector_mapping.insert(first_connector.id, first_local_solution);
 

	
 
        for (port_id, port_mapping) in first_branch.port_mapping.iter() {
 
            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap();
 
            let peer_port_desc = self.ports.get(&peer_port_id).unwrap();
 
            let peer_connector_id = peer_port_desc.owning_connector_id.unwrap();
 

	
 
            let constraint = match port_mapping.last_registered_identifier {
 
            let constraint = match port_mapping.last_registered_index {
 
                Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id),
 
                None => ProposedBranchConstraint::SilentPort(peer_port_id),
 
            };
 

	
 
            match first_solution.connector_propositions.entry(peer_connector_id) {
 
            match first_solution.connector_constraints.entry(peer_connector_id) {
 
                Entry::Vacant(entry) => {
 
                    // Not yet encountered
 
                    entry.insert(vec![constraint]);
 
@@ -661,93 +705,269 @@ impl Runtime {
 
        while !all_solutions.is_empty() {
 
            let mut cur_solution = all_solutions.pop().unwrap();
 

	
 
            if cur_solution.remaining_connectors.is_empty() {
 
                // All connectors have been visited, so commit the solution
 
                debug_assert!(cur_solution.connector_constraints.is_empty());
 
                return Some(cur_solution);
 
            } else {
 
                // Not all connectors have been visited yet, so take one of the
 
                // connectors and visit it.
 
                let target_connector = cur_solution.remaining_connectors.pop().unwrap();
 
                self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector);
 
            }
 
        }
 

	
 
        // No satisfying solution found
 
        return None;
 
    }
 

	
 
    fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec<ProposedSolution>, target_connector: u32) {
 
        debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited
 
        debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it
 
        debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it
 

	
 
        let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap();
 
        let cur_connector = self.registry.connectors.get(&target_connector).unwrap();
 
        let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap();
 
        let cur_connector = self.connectors.get(&target_connector).unwrap();
 

	
 
        // Make sure all propositions are unique
 
        for i in 0..branch_propositions.len() {
 
            let proposition_i = branch_propositions[i];
 
        for i in 0..branch_constraints.len() {
 
            let proposition_i = branch_constraints[i];
 
            for j in 0..i {
 
                let proposition_j = branch_propositions[j];
 
                let proposition_j = branch_constraints[j];
 
                debug_assert_ne!(proposition_i, proposition_j);
 
            }
 
        }
 

	
 
        // Check connector for compatible branches
 
        let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len());
 
        let mut encountered_propositions = Vec::new();
 
        // Go through the current connector's branches that have finished
 
        'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() {
 
            let finished_branch = &cur_connector.branches[finished_branch_idx as usize];
 

	
 
        'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done {
 
            // Reset the propositions matching variables
 
            encountered_propositions.clear();
 
            encountered_propositions.resize(branch_propositions.len(), false);
 
            // Construct a list of all the parent branch numbers
 
            let mut parent_branch_ids = Vec::new();
 
            self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids);
 

	
 
            // First check the silent port propositions
 
            let cur_branch = &cur_connector.branches[branch_idx as usize];
 
            for (proposition_idx, proposition) in branch_propositions.iter().enumerate() {
 
                match proposition {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id });
 
                        let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap();
 
                        if port_mapping.num_times_fired != 0 {
 
                            // Port did fire, so the current branch is not
 
                            // compatible
 
                            continue 'finished_branch_loop;
 
                        }
 
            // Go through all constraints and make sure they are satisfied by
 
            // the current branch
 
            let mut all_constraints_satisfied = true;
 

	
 
                        // Otherwise, the port was silent indeed
 
                        encountered_propositions[proposition_idx] = true;
 
            for constraint in branch_constraints {
 
                match constraint {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        // Specified should have remained silent
 
                        let port_id = PortId(Id{
 
                            connector_id: target_connector,
 
                            u32_suffix: *port_id,
 
                        });
 
                        debug_assert!(finished_branch.port_mapping.contains_key(&port_id));
 
                        let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap();
 
                        all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0;
 
                    },
 
                    ProposedBranchConstraint::BranchNumber(_) => {},
 
                }
 
            }
 

	
 
            // Then check the branch number propositions
 
            let mut parent_branch_idx = branch_idx;
 
            loop {
 
                let branch = &cur_connector.branches[parent_branch_idx as usize];
 
                for proposition_idx in 0..branch_propositions.len() {
 
                    let proposition = branch_propositions[proposition_idx];
 
                    match proposition {
 
                        ProposedBranchConstraint::SilentPort(_) => {},
 
                        ProposedBranchConstraint::BranchNumber(branch_number) => {
 
                            if branch_number == branch.identifier {
 
                                encountered_propositions[proposition_idx] = true;
 
                            }
 
                        }
 
                    ProposedBranchConstraint::BranchNumber(branch_id) => {
 
                        // Branch number should have appeared in the
 
                        // predecessor branches.
 
                        all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id);
 
                    },
 
                    ProposedBranchConstraint::PortMapping(port_id, branch_id) => {
 
                        // Port should map to a particular branch number
 
                        let port_id = PortId(Id{
 
                            connector_id: target_connector,
 
                            u32_suffix: *port_id,
 
                        });
 
                        debug_assert!(finished_branch.port_mapping.contains_key(&port_id));
 
                        let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap();
 
                        all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id);
 
                    }
 
                }
 

	
 
                if branch.parent_index.is_none() {
 
                    // No more parents
 
                if !all_constraints_satisfied {
 
                    break;
 
                }
 
            }
 

	
 
            if !all_constraints_satisfied {
 
                continue;
 
            }
 

	
 
            // If here, then all constraints on the finished branch are
 
            // satisfied. But the finished branch also puts constraints on the
 
            // other connectors. So either:
 
            // 1. Add them to the list of constraints a peer connector should
 
            //  adhere to.
 
            // 2. Make sure that the provided connector solution matches the
 
            //  constraints imposed by the currently considered finished branch
 
            //
 
            // To make our lives a bit easier we already insert our proposed
 
            // local solution into a prepared global solution. This makes
 
            // looking up remote ports easier (since the channel might have its
 
            // two ends owned by the same connector).
 
            let mut new_solution = cur_solution.clone();
 
            debug_assert!(!new_solution.remaining_connectors.contains(&target_connector));
 
            new_solution.connector_constraints.remove(&target_connector);
 
            new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{
 
                final_branch_id: finished_branch.index,
 
                all_branch_ids: parent_branch_ids,
 
                port_mapping: finished_branch.port_mapping
 
                    .iter()
 
                    .map(|(port_id, port_desc)| {
 
                        (port_id.0.u32_suffix, port_desc.last_registered_index)
 
                    })
 
                    .collect(),
 
            });
 

	
 
            for (local_port_id, port_desc) in &finished_branch.port_mapping {
 
                // Retrieve port of peer
 
                let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap();
 
                let peer_port_id = port_info.peer_id;
 
                let peer_port_info = self.ports.get(&peer_port_id).unwrap();
 
                let peer_connector_id = peer_port_info.owning_connector_id.unwrap();
 

	
 
                // If the connector was not present in the new global solution
 
                // yet, add it now, as it simplifies the following logic
 
                if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) {
 
                    new_solution.connector_constraints.insert(peer_connector_id, Vec::new());
 
                    new_solution.remaining_connectors.push(peer_connector_id);
 
                }
 

	
 
                parent_branch_idx = branch.parent_index.unwrap();
 
                if new_solution.remaining_connectors.contains(&peer_connector_id) {
 
                    // Constraint applies to a connector that has not yet been
 
                    // visited
 
                    debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id));
 
                    debug_assert_ne!(peer_connector_id, target_connector);
 

	
 
                    let new_constraint = if port_desc.num_times_fired == 0 {
 
                        ProposedBranchConstraint::SilentPort(peer_port_id)
 
                    } else if peer_port_info.is_getter {
 
                        // Peer port is a getter, so we want its port to map to
 
                        // the branch number in our port mapping.
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap())
 
                    } else {
 
                        // Peer port is a putter, so we want to restrict the
 
                        // solution's run to contain the branch ID we received.
 
                        ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap())
 
                    };
 

	
 
                    let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap();
 
                    if !peer_constraints.contains(&new_constraint) {
 
                        peer_constraints.push(new_constraint);
 
                    }
 
                } else {
 
                    // Constraint applies to an already visited connector
 
                    let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap();
 
                    if port_desc.num_times_fired == 0 {
 
                        let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap();
 
                        if peer_mapped_id.is_some() {
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    } else if peer_port_info.is_getter {
 
                        // Peer is getter, so its port should be mapped to one
 
                        // of our branch IDs. To simplify lookup we look at the
 
                        // last message we sent to the getter.
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        let peer_port = peer_solution.port_mapping.get(&peer_port_id)
 
                            .map_or(None, |v| *v);
 

	
 
                        if port_desc.last_registered_index != peer_port {
 
                            // No match
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    } else {
 
                        // Peer is putter, so we expect to find our port mapping
 
                        // to match one of the branch numbers in the peer
 
                        // connector's local solution
 
                        debug_assert!(port_desc.last_registered_index.is_some());
 
                        let expected_branch_id = port_desc.last_registered_index.unwrap();
 

	
 
                        if !peer_solution.all_branch_ids.contains(&expected_branch_id) {
 
                            all_constraints_satisfied = false;
 
                            break;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            if !encountered_propositions.iter().all(|v| *v) {
 
                // Not all of the constraints were matched
 
                continue 'finished_branch_loop
 
            if !all_constraints_satisfied {
 
                // Final checks failed
 
                continue 'branch_loop
 
            }
 

	
 
            // All of the constraints on the branch did indeed match.
 
            // We're sure that this branch matches the provided solution, so
 
            // push it onto the list of considered solutions
 
            all_solutions.push(new_solution);
 
        }
 
    }
 

	
 
    fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) {
 
        // Retrieve connector and branch
 
        let connector = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch
 
        debug_assert!(connector.in_sync);
 
        debug_assert!(connector.spec_branches_done.contains(&branch_id));
 

	
 
        // Put the selected solution in front, the branch at index 0 is the
 
        // "non-sync" branch.
 
        connector.branches.swap(0, branch_id as usize);
 
        connector.branches.truncate(1);
 

	
 
        // And reset the connector's state for further execution
 
        connector.in_sync = false;
 
        connector.spec_branches_active.clear();
 
        connector.spec_branches_active.push_back(0);
 
        connector.spec_branches_pending_receive.clear();
 
        connector.spec_branches_done.clear();
 
        connector.last_checked_done = 0;
 
        connector.global_inbox.clear();
 
        connector.global_outbox.clear();
 

	
 
        // Do the same thing for the final selected branch
 
        let final_branch = &mut connector.branches[0];
 
        final_branch.index = 0;
 
        final_branch.parent_index = None;
 
        debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync);
 
        final_branch.branch_state = BranchState::RunningNonSync;
 
        final_branch.message_inbox.clear();
 
        final_branch.port_mapping.clear();
 

	
 
        // Might be that the connector was no longer running, if so, put it back
 
        // in the list of connectors to run
 
        if !self.connectors_active.contains(&connector_id) {
 
            self.connectors_active.push_back(connector_id);
 
        }
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.registry.connector_counter;
 
        self.registry.connector_counter += 1;
 
        let id = self.connector_counter;
 
        self.connector_counter += 1;
 
        return id;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for port management
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn add_owned_channel(ports: &mut HashMap<u32, PortDesc>, port_counter: &mut u32, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = *port_counter;
 
        let put_id = *port_counter + 1;
 
        (*port_counter) += 2;
 

	
 
        ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for branch management
 
    // -------------------------------------------------------------------------
 
@@ -760,29 +980,42 @@ impl Runtime {
 
        debug_assert_eq!(desc.branches.len(), 1);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        let new_branch_index = 1;
 
        let new_branch_identifier = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]);
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_id);
 
        desc.spec_branches_active.push_back(new_branch_index);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns its index.
 
    fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 {
 
    /// Duplicates a particular (speculative) branch and returns it. Due to
 
    /// borrowing rules in code that uses this helper the returned branch still
 
    /// needs to be pushed onto the member `branches`.
 
    fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_id = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch);
 

	
 
        return copied_branch;
 
    }
 

	
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch);
 
        desc.branches.push(copied_branch);
 
    /// Retrieves all parent IDs of a particular branch. These numbers run from
 
    /// the leaf towards the parent.
 
    fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec<u32>) {
 
        let mut next_branch_index = first_branch_index;
 
        result.clear();
 

	
 
        return copied_index;
 
        loop {
 
            result.push(next_branch_index);
 
            let branch = &desc.branches[next_branch_index as usize];
 

	
 
            match branch.parent_index {
 
                Some(index) => next_branch_index = index,
 
                None => return,
 
            }
 
        }
 
    }
 
}
 

	
 
@@ -792,24 +1025,37 @@ impl Runtime {
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Temporary references to branch related storage
 
    inbox: &'a HashMap<(PortId, u32), ValueGroup>,
 
    port_mapping: &'a HashMap<PortId, BranchPortDesc>,
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    just_called_did_put: bool,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&self, port: PortId) -> bool {
 
        todo!()
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        return self.just_called_did_put
 
    }
 

	
 
    fn get(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let inbox_key = (port, 1);
 
        match self.inbox.get(&inbox_key) {
 
            None => None,
 
            Some(value) => Some(value.clone()),
 
        }
 
    }
 

	
 
    fn fires(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self.port_mapping.get(&port) {
 
            None => None,
 
            Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)),
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
@@ -825,7 +1071,7 @@ fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>)
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports {
 
                for prev_port in ports.iter() {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
src/runtime2/tests/mod.rs
Show inline comments
 
new file 100644
 
#[test]
 
fn testing_runtime2() {
 
    println!("YESH!");
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)