Changeset - 0d5a89aea247
[Not reviewed]
0 3 1
MH - 4 years ago 2021-09-13 12:22:34
contact@maxhenger.nl
halfway shared-memory new consensus algorithm
4 files changed with 498 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/value.rs
Show inline comments
 
@@ -67,192 +67,193 @@ macro_rules! impl_union_unpack_as_value {
 
}
 

	
 
impl_union_unpack_as_value!(as_stack_boundary, Value::PrevStackBoundary, isize);
 
impl_union_unpack_as_value!(as_ref,     Value::Ref,     ValueId);
 
impl_union_unpack_as_value!(as_input,   Value::Input,   PortId);
 
impl_union_unpack_as_value!(as_output,  Value::Output,  PortId);
 
impl_union_unpack_as_value!(as_message, Value::Message, HeapPos);
 
impl_union_unpack_as_value!(as_bool,    Value::Bool,    bool);
 
impl_union_unpack_as_value!(as_char,    Value::Char,    char);
 
impl_union_unpack_as_value!(as_string,  Value::String,  HeapPos);
 
impl_union_unpack_as_value!(as_uint8,   Value::UInt8,   u8);
 
impl_union_unpack_as_value!(as_uint16,  Value::UInt16,  u16);
 
impl_union_unpack_as_value!(as_uint32,  Value::UInt32,  u32);
 
impl_union_unpack_as_value!(as_uint64,  Value::UInt64,  u64);
 
impl_union_unpack_as_value!(as_sint8,   Value::SInt8,   i8);
 
impl_union_unpack_as_value!(as_sint16,  Value::SInt16,  i16);
 
impl_union_unpack_as_value!(as_sint32,  Value::SInt32,  i32);
 
impl_union_unpack_as_value!(as_sint64,  Value::SInt64,  i64);
 
impl_union_unpack_as_value!(as_array,   Value::Array,   HeapPos);
 
impl_union_unpack_as_value!(as_enum,    Value::Enum,    i64);
 
impl_union_unpack_as_value!(as_struct,  Value::Struct,  HeapPos);
 

	
 
impl Value {
 
    pub(crate) fn as_union(&self) -> (i64, HeapPos) {
 
        match self {
 
            Value::Union(tag, v) => (*tag, *v),
 
            _ => panic!("called as_union on {:?}", self),
 
        }
 
    }
 

	
 
    pub(crate) fn is_integer(&self) -> bool {
 
        match self {
 
            Value::UInt8(_) | Value::UInt16(_) | Value::UInt32(_) | Value::UInt64(_) |
 
            Value::SInt8(_) | Value::SInt16(_) | Value::SInt32(_) | Value::SInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn is_unsigned_integer(&self) -> bool {
 
        match self {
 
            Value::UInt8(_) | Value::UInt16(_) | Value::UInt32(_) | Value::UInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn is_signed_integer(&self) -> bool {
 
        match self {
 
            Value::SInt8(_) | Value::SInt16(_) | Value::SInt32(_) | Value::SInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn as_unsigned_integer(&self) -> u64 {
 
        match self {
 
            Value::UInt8(v)  => *v as u64,
 
            Value::UInt16(v) => *v as u64,
 
            Value::UInt32(v) => *v as u64,
 
            Value::UInt64(v) => *v as u64,
 
            _ => unreachable!("called as_unsigned_integer on {:?}", self),
 
        }
 
    }
 

	
 
    pub(crate) fn as_signed_integer(&self) -> i64 {
 
        match self {
 
            Value::SInt8(v)  => *v as i64,
 
            Value::SInt16(v) => *v as i64,
 
            Value::SInt32(v) => *v as i64,
 
            Value::SInt64(v) => *v as i64,
 
            _ => unreachable!("called as_signed_integer on {:?}", self)
 
        }
 
    }
 

	
 
    /// Returns the heap position associated with the value. If the value
 
    /// doesn't store anything in the heap then we return `None`.
 
    pub(crate) fn get_heap_pos(&self) -> Option<HeapPos> {
 
        match self {
 
            Value::Message(v) => Some(*v),
 
            Value::String(v) => Some(*v),
 
            Value::Array(v) => Some(*v),
 
            Value::Union(_, v) => Some(*v),
 
            Value::Struct(v) => Some(*v),
 
            _ => None
 
        }
 
    }
 
}
 

	
 
/// When providing arguments to a new component, or when transferring values
 
/// from one component's store to a newly instantiated component, one has to
 
/// transfer stack and heap values. This `ValueGroup` represents such a
 
/// temporary group of values with potential heap allocations.
 
///
 
/// Constructing such a ValueGroup manually requires some extra care to make
 
/// sure all elements of `values` point to valid elements of `regions`.
 
///
 
/// Again: this is a temporary thing, hopefully removed once we move to a
 
/// bytecode interpreter.
 
#[derive(Clone)]
 
pub struct ValueGroup {
 
    pub(crate) values: Vec<Value>,
 
    pub(crate) regions: Vec<Vec<Value>>
 
}
 

	
 
impl ValueGroup {
 
    pub(crate) fn new_stack(values: Vec<Value>) -> Self {
 
        debug_assert!(values.iter().all(|v| v.get_heap_pos().is_none()));
 
        Self{
 
            values,
 
            regions: Vec::new(),
 
        }
 
    }
 
    pub(crate) fn from_store(store: &Store, values: &[Value]) -> Self {
 
        let mut group = ValueGroup{
 
            values: Vec::with_capacity(values.len()),
 
            regions: Vec::with_capacity(values.len()), // estimation
 
        };
 

	
 
        for value in values {
 
            let transferred = group.retrieve_value(value, store);
 
            group.values.push(transferred);
 
        }
 

	
 
        group
 
    }
 

	
 
    /// Transfers a provided value from a store into a local value with its
 
    /// heap allocations (if any) stored in the ValueGroup. Calling this
 
    /// function will not store the returned value in the `values` member.
 
    fn retrieve_value(&mut self, value: &Value, from_store: &Store) -> Value {
 
        let value = from_store.maybe_read_ref(value);
 
        if let Some(heap_pos) = value.get_heap_pos() {
 
            // Value points to a heap allocation, so transfer the heap values
 
            // internally.
 
            let from_region = &from_store.heap_regions[heap_pos as usize].values;
 
            let mut new_region = Vec::with_capacity(from_region.len());
 
            for value in from_region {
 
                let transferred = self.retrieve_value(value, from_store);
 
                new_region.push(transferred);
 
            }
 

	
 
            // Region is constructed, store internally and return the new value.
 
            let new_region_idx = self.regions.len() as HeapPos;
 
            self.regions.push(new_region);
 

	
 
            return match value {
 
                Value::Message(_)    => Value::Message(new_region_idx),
 
                Value::String(_)     => Value::String(new_region_idx),
 
                Value::Array(_)      => Value::Array(new_region_idx),
 
                Value::Union(tag, _) => Value::Union(*tag, new_region_idx),
 
                Value::Struct(_)     => Value::Struct(new_region_idx),
 
                _ => unreachable!(),
 
            };
 
        } else {
 
            return value.clone();
 
        }
 
    }
 

	
 
    /// Transfers the heap values and the stack values into the store. Stack
 
    /// values are pushed onto the Store's stack in the order in which they
 
    /// appear in the value group.
 
    pub(crate) fn into_store(self, store: &mut Store) {
 
        for value in &self.values {
 
            let transferred = self.provide_value(value, store);
 
            store.stack.push(transferred);
 
        }
 
    }
 

	
 
    fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value {
 
        if let Some(from_heap_pos) = value.get_heap_pos() {
 
            let from_heap_pos = from_heap_pos as usize;
 
            let to_heap_pos = to_store.alloc_heap();
 
            let to_heap_pos_usize = to_heap_pos as usize;
 
            to_store.heap_regions[to_heap_pos_usize].values.reserve(self.regions[from_heap_pos].len());
 

	
 
            for value in &self.regions[from_heap_pos as usize] {
 
                let transferred = self.provide_value(value, to_store);
 
                to_store.heap_regions[to_heap_pos_usize].values.push(transferred);
 
            }
 

	
 
            return match value {
 
                Value::Message(_)    => Value::Message(to_heap_pos),
 
                Value::String(_)     => Value::String(to_heap_pos),
 
                Value::Array(_)      => Value::Array(to_heap_pos),
 
                Value::Union(tag, _) => Value::Union(*tag, to_heap_pos),
 
                Value::Struct(_)     => Value::Struct(to_heap_pos),
 
                _ => unreachable!(),
 
            };
 
        } else {
 
            return value.clone();
 
        }
 
    }
 
}
 

	
 
impl Default for ValueGroup {
src/runtime2/messages.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 

	
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.sending_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
    sent_counter: usize,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            sent_counter: 0,
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> {
 
        if self.sent_counter == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let cur_index = self.sent_counter;
 
        self.sent_counter += 1;
 
        return Some(&self.messages[cur_index]);
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
        self.sent_counter = 0;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
\ No newline at end of file
 
mod runtime;
 
mod messages;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::{HashMap, HashSet, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 
use super::messages::*;
 

	
 
enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
struct PortDesc {
 
    id: u32,
 
    peer_id: u32,
 
    owning_connector_id: Option<u32>,
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
// Message received from some kind of peer
 
struct BufferedMessage {
 
    // If in inbox, then sender is the connector's peer. If in the outbox, then
 
    // the sender is the connector itself.
 
    sending_port: PortId,
 
    receiving_port: PortId,
 
    peer_prev_branch_id: Option<u32>, // of the sender
 
    peer_cur_branch_id: u32, // of the sender
 
    message: ValueGroup,
 
}
 

	
 
struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    branch_id_counter: u32,
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, u32>, // from port_id to branch index
 
    global_inbox: HashMap<(PortId, u32), BufferedMessage>,
 
    global_outbox: HashMap<(PortId, u32), BufferedMessage>,
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        let mut branches_active = VecDeque::new();
 
        branches_active.push_back(0);
 

	
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            branch_id_counter: 1,
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            global_inbox: HashMap::new(),
 
            global_outbox: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 

	
 
enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
}
 

	
 
struct BranchPortDesc {
 
    last_registered_identifier: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    identifier: u32,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            identifier: 0,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self {
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            identifier,
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
        }
 
    }
 
}
 

	
 
// Separate from Runtime for borrowing reasons
 
struct Registry {
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
}
 

	
 
impl Registry {
 
    fn new() -> Self {
 
        Self{
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
        }
 
    }
 

	
 
    /// Returns (putter_port, getter_port)
 
    pub fn add_channel(&mut self, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = self.generate_port_id();
 
        let put_id = self.generate_port_id();
 

	
 
        self.ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        self.ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    fn generate_port_id(&mut self) -> u32 {
 
        let id = self.port_counter;
 
        self.port_counter += 1;
 
        return id;
 
    }
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
}
 

	
 
// Local solution of the connector
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    silent_ports: Vec<u32>, // port IDs of the connector itself
 
}
 

	
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_propositions: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    registry: Registry,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            registry: Registry::new(),
 
            connectors_active: VecDeque::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel that is not owned by any connector and returns its
 
    /// endpoints. The returned values are of the (putter port, getter port)
 
    /// respectively.
 
    pub fn add_channel(&mut self) -> (Value, Value) {
 
        let (put_id, get_id) = self.registry.add_channel(None);
 
        return (
 
            port_value_from_id(None, put_id, true),
 
            port_value_from_id(None, get_id, false)
 
        );
 
    }
 

	
 
    pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> {
 
        use AddComponentError as ACE;
 
        use crate::runtime::error::AddComponentError as OldACE;
 

	
 
        // TODO: Allow the ValueGroup to contain any kind of value
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
        // TODO: Remove this error enum translation. Note that for now this
 
        //  function forces port-only arguments
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                Value::Output(port_id) => {
 
                    if *polarity != Polarity::Putter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component
 
        let component_id = self.generate_connector_id();
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            self.run_connector(next_id);
 
            let run_again = self.run_connector(next_id);
 

	
 
            if run_again {
 
                self.connectors_active.push_back(next_id);
 
            }
 

	
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_solution(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, id: u32) -> bool {
 
        let desc = self.registry.connectors.get_mut(&id).unwrap();
 
        let mut run_context = Context{
 
            connector_id: id,
 
            branch_id: None,
 
            pending_channel: None,
 
        };
 

	
 
        let mut call_again = false;
 
        let mut call_again = false; // TODO: Come back to this, silly pattern
 

	
 
        while call_again {
 
            call_again = false; // bit of a silly pattern, maybe revise
 

	
 
            if desc.in_sync {
 
                // Running in synchronous mode, so run all branches until their
 
                // blocking point
 
                debug_assert!(!desc.spec_branches_active.is_empty());
 
                let branch_index = desc.spec_branches_active.pop_front().unwrap();
 

	
 
                let branch = &mut desc.branches[branch_index as usize];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::BranchInconsistent => {
 
                        // Speculative branch became inconsistent. So we don't
 
                        // run it again
 
                        branch.branch_state = BranchState::Failed;
 
                    },
 
                    RunResult::BranchMissingPortState(port_id) => {
 
                        // Branch called `fires()` on a port that did not have a
 
                        // value assigned yet. So branch and keep running
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                        let copied_index = Self::duplicate_branch(desc, branch_index);
 

	
 
                        // Need to re-borrow to assign changed port state
 
                        let original_branch = &mut desc.branches[branch_index as usize];
 
                        original_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 0,
 
                        });
 

	
 
                        let copied_branch = &mut desc.branches[copied_index as usize];
 
                        copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 1,
 
                        });
 

	
 
                        // Run both again
 
                        desc.spec_branches_active.push_back(branch_index);
 
                        desc.spec_branches_active.push_back(copied_index);
 
                    },
 
                    RunResult::BranchMissingPortValue(port_id) => {
 
                        // Branch just performed a `get()` on a port that did
 
                        // not yet receive a value.
 

	
 
                        // First check if a port value is assigned to the
 
                        // current branch. If so, check if it is consistent.
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        let mut insert_in_pending_receive = false;
 

	
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet, so force to firing
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 1,
 
                                });
 
                                branch.branch_state = BranchState::BranchPoint;
 
                                desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                insert_in_pending_receive = true;
 
                            },
 
                            Entry::Occupied(entry) => {
 
                                // Have an entry, check if it is consistent
 
                                let entry = entry.get();
 
                                if entry.num_times_fired == 0 {
 
                                    // Inconsistent
 
                                    branch.branch_state = BranchState::Failed;
 
                                } else {
 
                                    // Perfectly fine, add to queue
 
                                    debug_assert!(entry.last_registered_identifier.is_none());
 
                                    assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                                    branch.branch_state = BranchState::BranchPoint;
 
                                    desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                    insert_in_pending_receive = true;
 
                                }
 
                            }
 
                        }
 

	
 
                        if insert_in_pending_receive {
 
                            // Perform the insert
 
                            match desc.spec_branches_pending_receive.entry(port_id) {
 
                                Entry::Vacant(entry) => {
 
                                    entry.insert(vec![branch_index]);
 
                                }
 
                                Entry::Occupied(mut entry) => {
 
                                    let entry = entry.get_mut();
 
                                    debug_assert!(!entry.contains(&branch_index));
 
                                    entry.push(branch_index);
 
                                }
 
                            }
 

	
 
                            // But also check immediately if we don't have a
 
                            // previously received message. If so, we
 
                            // immediately branch and accept the message
 
                            if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                                for message in messages {
 
                                    let new_branch_idx = Self::duplicate_branch(desc, branch_index);
 
                                    let new_branch = &mut desc.branches[new_branch_idx as usize];
 
                                    let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                                    new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id);
 
                                    new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                                    desc.spec_branches_active.push_back(new_branch_idx);
 
                                }
 
                            }
 
                        }
 
                    },
 
                    RunResult::BranchAtSyncEnd => {
 
                        // Check the branch for any ports that were not used and
 
                        // insert them in the port mapping as not having fired.
 
                        for port_index in branch.owned_ports {
 
                            let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                            if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                                entry.insert(BranchPortDesc {
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 0
 
                                });
 
                            }
 
                        }
 

	
 
                        // Mark the branch as being done
 
                        branch.branch_state = BranchState::ReachedEndSync;
 
                        todo!("somehow propose solution");
 
                        desc.spec_branches_done.push(branch_index);
 
                    },
 
                    RunResult::BranchPut(port_id, value_group) => {
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert_eq!(value_group.values.len(), 1)
 
                        debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                        // Branch just performed a `put()`. Check if we have
 
                        // assigned the port value and if so, if it is
 
                        // consistent.
 
                        let mut can_put = true;
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: Some(branch.identifier),
 
                                    num_times_fired: 1,
 
                                });
 
                            },
 
                            Entry::Occupied(mut entry) => {
 
                                // Pre-existing entry
 
                                let entry = entry.get_mut();
 
                                if entry.num_times_fired == 0 {
 
                                    // This is 'fine' in the sense that we have
 
                                    // a normal inconsistency in the branch.
 
                                    branch.branch_state = BranchState::Failed;
 
                                    can_put = false;
 
                                } else if entry.last_registered_identifier.is_none() {
 
                                    // A put() that follows a fires()
 
                                    entry.last_registered_identifier = Some(branch.identifier);
 
                                } else {
 
                                    // This should be fine in the future. But
 
                                    // for now we throw an error as it doesn't
 
                                    // mesh well with the 'fires()' concept.
 
                                    todo!("throw an error of some sort, then fail all related")
 
                                }
 
                            }
 
                        }
 

	
 
                        if can_put {
 
                            // Actually put the message in the outbox
 
                            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
                            let peer_id = port_desc.peer_id;
 
                            let peer_desc = self.registry.ports.get(&peer_id).unwrap();
 
                            debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                            let peer_id = PortId(Id{
 
                                connector_id: peer_desc.owning_connector_id.unwrap(),
 
                                u32_suffix: peer_id
 
                            });
 

	
 
                            // For now this is the one and only time we're going
 
                            // to send a message. So for now we can't send a
 
                            // branch ID.
 
                            desc.global_outbox.insert((port_id, 1), BufferedMessage{
 
                                sending_port: port_id,
 
                                receiving_port: peer_id,
 
                                peer_prev_branch_id: None,
 
                                peer_cur_branch_id: 0,
 
                                message: value_group,
 
                            });
 

	
 
                            // Finally, because we were able to put the message,
 
                            // we can run the branch again
 
                            desc.spec_branches_active.push_back(branch_index);
 
                            call_again = true;
 
                        }
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
                }
 
            } else {
 
                // Running in non-synchronous mode
 
                let branch = &mut desc.branches[0];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::ComponentTerminated => return false,
 
                    RunResult::ComponentAtSyncStart => {
 
                        // Prepare for sync execution
 
                        Self::prepare_branch_for_sync(desc);
 
                        call_again = true;
 
                    },
 
                    RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                        // Generate a new connector with its own state
 
                        let new_component_id = self.generate_connector_id();
 
                        let new_component_state = ComponentState {
 
                            prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                        };
 

	
 
                        // Transfer the ownership of any ports to the new connector
 
                        let mut ports = Vec::with_capacity(arguments.values.len());
 
                        find_ports_in_value_group(&arguments, &mut ports);
 
                        for port_id in &ports {
 
                            let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                            debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id);
 
                            port.owning_connector_id = Some(new_component_id)
 
                        }
 

	
 
                        // Finally push the new connector into the registry
 
                        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                        self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                        self.connectors_active.push_back(new_component_id);
 
                    },
 
                    RunResult::NewChannel => {
 
                        // Prepare channel
 
                        debug_assert!(run_context.pending_channel.is_none());
 
                        let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id));
 
                        run_context.pending_channel = Some((
 
                            port_value_from_id(Some(run_context.connector_id), put_id, true),
 
                            port_value_from_id(Some(run_context.connector_id), get_id, false)
 
                        ));
 

	
 
                        // Call again so it is retrieved from the context
 
                        call_again = true;
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() {
 
            // Lookup the target connector
 
            let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 

	
 
            // Check if there are any branches that are waiting on
 
            // receives
 
            if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) {
 
                // Check each of the branches for a port mapping that
 
                // matches the one on the message header
 
                for branch_index in branch_indices {
 
                    let branch = &mut target_connector.branches[*branch_index as usize];
 
                    debug_assert_eq!(branch.branch_state, BranchState::BranchPoint);
 

	
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                            can_branch = true;
 
                        }
 
                    }
 

	
 
                    if can_branch {
 
                        // Put the message inside a clone of the currently
 
                        // waiting branch
 
                        let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch = &mut target_connector.branches[new_branch_idx as usize];
 
                        let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap();
 
                        new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id);
 
                        new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone());
 

	
 
                        // And queue the branch for further execution
 
                        target_connector.spec_branches_active.push(new_branch_idx);
 
                        if !self.connectors_active.contains(&target_connector.id) {
 
                            self.connectors_active.push_back(target_connector.id);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Checks a connector for the submitted solutions. After all neighbouring
 
    /// connectors have been checked all of their "last checked solution" index
 
    /// will be incremented.
 
    fn check_connector_new_solutions(&mut self, connector_index: u32) {
 
        // Take connector and start processing its solutions
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        let mut considered_connectors = HashSet::new();
 
        let mut valid_solutions = Vec::new();
 

	
 
        while connector.last_checked_done != connector.spec_branches_done.len() as u32 {
 
            // We have a new solution to consider
 
            let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize];
 
            connector.last_checked_done += 1;
 

	
 
            let branch = &connector.branches[start_branch_index as usize];
 
            debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync);
 

	
 
            // Clear storage for potential solutions
 
            considered_connectors.clear();
 

	
 
            // Start seeking solution among other connectors within the same
 
            // synchronous region
 
            considered_connectors.insert(connector.id);
 
            for port in branch.port_
 
        }
 
    }
 

	
 
    fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) {
 
        // Take the connector and branch of interest
 
        let first_connector = self.registry.connectors.get(&first_connector_index).unwrap();
 
        let first_branch = &first_connector.branches[first_branch_index as usize];
 
        debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync);
 

	
 
        // Setup the first solution
 
        let mut first_solution = ProposedSolution{
 
            connector_mapping: HashMap::new(),
 
            connector_propositions: HashMap::new(),
 
            remaining_connectors: Vec::new(),
 
        };
 
        first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier);
 
        for (port_id, port_mapping) in first_branch.port_mapping.iter() {
 
            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap();
 
            let peer_connector_id = peer_port_desc.owning_connector_id.unwrap();
 

	
 
            let constraint = match port_mapping.last_registered_identifier {
 
                Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id),
 
                None => ProposedBranchConstraint::SilentPort(peer_port_id),
 
            };
 

	
 
            match first_solution.connector_propositions.entry(peer_connector_id) {
 
                Entry::Vacant(entry) => {
 
                    // Not yet encountered
 
                    entry.insert(vec![constraint]);
 
                    first_solution.remaining_connectors.push(peer_connector_id);
 
                },
 
                Entry::Occupied(mut entry) => {
 
                    // Already encountered
 
                    let entry = entry.get_mut();
 
                    if !entry.contains(&constraint) {
 
                        entry.push(constraint);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Setup storage for all possible solutions
 
        let mut all_solutions = Vec::new();
 
        all_solutions.push(first_solution);
 

	
 
        while !all_solutions.is_empty() {
 
            let mut cur_solution = all_solutions.pop().unwrap();
 

	
 
        }
 
    }
 

	
 
    fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec<ProposedSolution>, target_connector: u32) {
 
        debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited
 
        debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it
 

	
 
        let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap();
 
        let cur_connector = self.registry.connectors.get(&target_connector).unwrap();
 

	
 
        // Make sure all propositions are unique
 
        for i in 0..branch_propositions.len() {
 
            let proposition_i = branch_propositions[i];
 
            for j in 0..i {
 
                let proposition_j = branch_propositions[j];
 
                debug_assert_ne!(proposition_i, proposition_j);
 
            }
 
        }
 

	
 
        // Check connector for compatible branches
 
        let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len());
 
        let mut encountered_propositions = Vec::new();
 

	
 
        'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done {
 
            // Reset the propositions matching variables
 
            encountered_propositions.clear();
 
            encountered_propositions.resize(branch_propositions.len(), false);
 

	
 
            // First check the silent port propositions
 
            let cur_branch = &cur_connector.branches[branch_idx as usize];
 
            for (proposition_idx, proposition) in branch_propositions.iter().enumerate() {
 
                match proposition {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id });
 
                        let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap();
 
                        if port_mapping.num_times_fired != 0 {
 
                            // Port did fire, so the current branch is not
 
                            // compatible
 
                            continue 'finished_branch_loop;
 
                        }
 

	
 
                        // Otherwise, the port was silent indeed
 
                        encountered_propositions[proposition_idx] = true;
 
                    },
 
                    ProposedBranchConstraint::BranchNumber(_) => {},
 
                }
 
            }
 

	
 
            // Then check the branch number propositions
 
            let mut parent_branch_idx = branch_idx;
 
            loop {
 
                let branch = &cur_connector.branches[parent_branch_idx as usize];
 
                for proposition_idx in 0..branch_propositions.len() {
 
                    let proposition = branch_propositions[proposition_idx];
 
                    match proposition {
 
                        ProposedBranchConstraint::SilentPort(_) => {},
 
                        ProposedBranchConstraint::BranchNumber(branch_number) => {
 
                            if branch_number == branch.identifier {
 
                                encountered_propositions[proposition_idx] = true;
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if branch.parent_index.is_none() {
 
                    // No more parents
 
                    break;
 
                }
 

	
 
                parent_branch_idx = branch.parent_index.unwrap();
 
            }
 

	
 
            if !encountered_propositions.iter().all(|v| *v) {
 
                // Not all of the constraints were matched
 
                continue 'finished_branch_loop
 
            }
 

	
 
            // All of the constraints on the branch did indeed match.
 
        }
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.registry.connector_counter;
 
        self.registry.connector_counter += 1;
 
        return id;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for branch management
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a speculative branch for further execution from the connector's
 
    /// non-speculative base branch.
 
    fn prepare_branch_for_sync(desc: &mut ConnectorDesc) {
 
        // Ensure only one branch is active, the non-sync branch
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        let new_branch_index = 1;
 
        let new_branch_identifier = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_id);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns its index.
 
    fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_id = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch);
 
        desc.branches.push(copied_branch);
 

	
 
        return copied_index;
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn fires(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 
                
 
                ports.push(*port_id);
 
            },
 
            Value::Array(heap_pos) | 
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
0 comments (0 inline, 0 general)