Changeset - 0d5a89aea247
[Not reviewed]
0 3 1
MH - 4 years ago 2021-09-13 12:22:34
contact@maxhenger.nl
halfway shared-memory new consensus algorithm
4 files changed with 498 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/value.rs
Show inline comments
 

	
 
use super::store::*;
 
use crate::PortId;
 
use crate::protocol::ast::{
 
    AssignmentOperator,
 
    BinaryOperator,
 
    UnaryOperator,
 
    ConcreteType,
 
    ConcreteTypePart,
 
};
 
use crate::protocol::parser::token_parsing::*;
 

	
 
pub type StackPos = u32;
 
pub type HeapPos = u32;
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum ValueId {
 
    Stack(StackPos), // place on stack
 
    Heap(HeapPos, u32), // allocated region + values within that region
 
}
 

	
 
/// Represents a value stored on the stack or on the heap. Some values contain
 
/// a `HeapPos`, implying that they're stored in the store's `Heap`. Clearing
 
/// a `Value` with a `HeapPos` from a stack must also clear the associated
 
/// region from the `Heap`.
 
#[derive(Debug, Clone)]
 
pub enum Value {
 
    // Special types, never encountered during evaluation if the compiler works correctly
 
    Unassigned,                 // Marker when variables are first declared, immediately followed by assignment
 
    PrevStackBoundary(isize),   // Marker for stack frame beginning, so we can pop stack values
 
    Ref(ValueId),               // Reference to a value, used by expressions producing references
 
    Binding(StackPos),          // Reference to a binding variable (reserved on the stack)
 
    // Builtin types
 
    Input(PortId),
 
    Output(PortId),
 
    Message(HeapPos),
 
    Null,
 
    Bool(bool),
 
    Char(char),
 
    String(HeapPos),
 
    UInt8(u8),
 
    UInt16(u16),
 
    UInt32(u32),
 
    UInt64(u64),
 
    SInt8(i8),
 
    SInt16(i16),
 
    SInt32(i32),
 
    SInt64(i64),
 
    Array(HeapPos),
 
    // Instances of user-defined types
 
    Enum(i64),
 
    Union(i64, HeapPos),
 
    Struct(HeapPos),
 
}
 

	
 
macro_rules! impl_union_unpack_as_value {
 
    ($func_name:ident, $variant_name:path, $return_type:ty) => {
 
        impl Value {
 
            pub(crate) fn $func_name(&self) -> $return_type {
 
                match self {
 
                    $variant_name(v) => *v,
 
                    _ => panic!(concat!("called ", stringify!($func_name()), " on {:?}"), self),
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
impl_union_unpack_as_value!(as_stack_boundary, Value::PrevStackBoundary, isize);
 
impl_union_unpack_as_value!(as_ref,     Value::Ref,     ValueId);
 
impl_union_unpack_as_value!(as_input,   Value::Input,   PortId);
 
impl_union_unpack_as_value!(as_output,  Value::Output,  PortId);
 
impl_union_unpack_as_value!(as_message, Value::Message, HeapPos);
 
impl_union_unpack_as_value!(as_bool,    Value::Bool,    bool);
 
impl_union_unpack_as_value!(as_char,    Value::Char,    char);
 
impl_union_unpack_as_value!(as_string,  Value::String,  HeapPos);
 
impl_union_unpack_as_value!(as_uint8,   Value::UInt8,   u8);
 
impl_union_unpack_as_value!(as_uint16,  Value::UInt16,  u16);
 
impl_union_unpack_as_value!(as_uint32,  Value::UInt32,  u32);
 
impl_union_unpack_as_value!(as_uint64,  Value::UInt64,  u64);
 
impl_union_unpack_as_value!(as_sint8,   Value::SInt8,   i8);
 
impl_union_unpack_as_value!(as_sint16,  Value::SInt16,  i16);
 
impl_union_unpack_as_value!(as_sint32,  Value::SInt32,  i32);
 
impl_union_unpack_as_value!(as_sint64,  Value::SInt64,  i64);
 
impl_union_unpack_as_value!(as_array,   Value::Array,   HeapPos);
 
impl_union_unpack_as_value!(as_enum,    Value::Enum,    i64);
 
impl_union_unpack_as_value!(as_struct,  Value::Struct,  HeapPos);
 

	
 
impl Value {
 
    pub(crate) fn as_union(&self) -> (i64, HeapPos) {
 
        match self {
 
            Value::Union(tag, v) => (*tag, *v),
 
            _ => panic!("called as_union on {:?}", self),
 
        }
 
    }
 

	
 
    pub(crate) fn is_integer(&self) -> bool {
 
        match self {
 
            Value::UInt8(_) | Value::UInt16(_) | Value::UInt32(_) | Value::UInt64(_) |
 
            Value::SInt8(_) | Value::SInt16(_) | Value::SInt32(_) | Value::SInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn is_unsigned_integer(&self) -> bool {
 
        match self {
 
            Value::UInt8(_) | Value::UInt16(_) | Value::UInt32(_) | Value::UInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn is_signed_integer(&self) -> bool {
 
        match self {
 
            Value::SInt8(_) | Value::SInt16(_) | Value::SInt32(_) | Value::SInt64(_) => true,
 
            _ => false
 
        }
 
    }
 

	
 
    pub(crate) fn as_unsigned_integer(&self) -> u64 {
 
        match self {
 
            Value::UInt8(v)  => *v as u64,
 
            Value::UInt16(v) => *v as u64,
 
            Value::UInt32(v) => *v as u64,
 
            Value::UInt64(v) => *v as u64,
 
            _ => unreachable!("called as_unsigned_integer on {:?}", self),
 
        }
 
    }
 

	
 
    pub(crate) fn as_signed_integer(&self) -> i64 {
 
        match self {
 
            Value::SInt8(v)  => *v as i64,
 
            Value::SInt16(v) => *v as i64,
 
            Value::SInt32(v) => *v as i64,
 
            Value::SInt64(v) => *v as i64,
 
            _ => unreachable!("called as_signed_integer on {:?}", self)
 
        }
 
    }
 

	
 
    /// Returns the heap position associated with the value. If the value
 
    /// doesn't store anything in the heap then we return `None`.
 
    pub(crate) fn get_heap_pos(&self) -> Option<HeapPos> {
 
        match self {
 
            Value::Message(v) => Some(*v),
 
            Value::String(v) => Some(*v),
 
            Value::Array(v) => Some(*v),
 
            Value::Union(_, v) => Some(*v),
 
            Value::Struct(v) => Some(*v),
 
            _ => None
 
        }
 
    }
 
}
 

	
 
/// When providing arguments to a new component, or when transferring values
 
/// from one component's store to a newly instantiated component, one has to
 
/// transfer stack and heap values. This `ValueGroup` represents such a
 
/// temporary group of values with potential heap allocations.
 
///
 
/// Constructing such a ValueGroup manually requires some extra care to make
 
/// sure all elements of `values` point to valid elements of `regions`.
 
///
 
/// Again: this is a temporary thing, hopefully removed once we move to a
 
/// bytecode interpreter.
 
#[derive(Clone)]
 
pub struct ValueGroup {
 
    pub(crate) values: Vec<Value>,
 
    pub(crate) regions: Vec<Vec<Value>>
 
}
 

	
 
impl ValueGroup {
 
    pub(crate) fn new_stack(values: Vec<Value>) -> Self {
 
        debug_assert!(values.iter().all(|v| v.get_heap_pos().is_none()));
 
        Self{
 
            values,
 
            regions: Vec::new(),
 
        }
 
    }
 
    pub(crate) fn from_store(store: &Store, values: &[Value]) -> Self {
 
        let mut group = ValueGroup{
 
            values: Vec::with_capacity(values.len()),
 
            regions: Vec::with_capacity(values.len()), // estimation
 
        };
 

	
 
        for value in values {
 
            let transferred = group.retrieve_value(value, store);
 
            group.values.push(transferred);
 
        }
 

	
 
        group
 
    }
 

	
 
    /// Transfers a provided value from a store into a local value with its
 
    /// heap allocations (if any) stored in the ValueGroup. Calling this
 
    /// function will not store the returned value in the `values` member.
 
    fn retrieve_value(&mut self, value: &Value, from_store: &Store) -> Value {
 
        let value = from_store.maybe_read_ref(value);
 
        if let Some(heap_pos) = value.get_heap_pos() {
 
            // Value points to a heap allocation, so transfer the heap values
 
            // internally.
 
            let from_region = &from_store.heap_regions[heap_pos as usize].values;
 
            let mut new_region = Vec::with_capacity(from_region.len());
 
            for value in from_region {
 
                let transferred = self.retrieve_value(value, from_store);
 
                new_region.push(transferred);
 
            }
 

	
 
            // Region is constructed, store internally and return the new value.
 
            let new_region_idx = self.regions.len() as HeapPos;
 
            self.regions.push(new_region);
 

	
 
            return match value {
 
                Value::Message(_)    => Value::Message(new_region_idx),
 
                Value::String(_)     => Value::String(new_region_idx),
 
                Value::Array(_)      => Value::Array(new_region_idx),
 
                Value::Union(tag, _) => Value::Union(*tag, new_region_idx),
 
                Value::Struct(_)     => Value::Struct(new_region_idx),
 
                _ => unreachable!(),
 
            };
 
        } else {
 
            return value.clone();
 
        }
 
    }
 

	
 
    /// Transfers the heap values and the stack values into the store. Stack
 
    /// values are pushed onto the Store's stack in the order in which they
 
    /// appear in the value group.
 
    pub(crate) fn into_store(self, store: &mut Store) {
 
        for value in &self.values {
 
            let transferred = self.provide_value(value, store);
 
            store.stack.push(transferred);
 
        }
 
    }
 

	
 
    fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value {
 
        if let Some(from_heap_pos) = value.get_heap_pos() {
 
            let from_heap_pos = from_heap_pos as usize;
 
            let to_heap_pos = to_store.alloc_heap();
 
            let to_heap_pos_usize = to_heap_pos as usize;
 
            to_store.heap_regions[to_heap_pos_usize].values.reserve(self.regions[from_heap_pos].len());
 

	
 
            for value in &self.regions[from_heap_pos as usize] {
 
                let transferred = self.provide_value(value, to_store);
 
                to_store.heap_regions[to_heap_pos_usize].values.push(transferred);
 
            }
 

	
 
            return match value {
 
                Value::Message(_)    => Value::Message(to_heap_pos),
 
                Value::String(_)     => Value::String(to_heap_pos),
 
                Value::Array(_)      => Value::Array(to_heap_pos),
 
                Value::Union(tag, _) => Value::Union(*tag, to_heap_pos),
 
                Value::Struct(_)     => Value::Struct(to_heap_pos),
 
                _ => unreachable!(),
 
            };
 
        } else {
 
            return value.clone();
 
        }
 
    }
 
}
 

	
 
impl Default for ValueGroup {
 
    /// Returns an empty ValueGroup
 
    fn default() -> Self {
 
        Self { values: Vec::new(), regions: Vec::new() }
 
    }
 
}
 

	
 
enum ValueKind { Message, String, Array }
 

	
 
pub(crate) fn apply_assignment_operator(store: &mut Store, lhs: ValueId, op: AssignmentOperator, rhs: Value) {
 
    use AssignmentOperator as AO;
 

	
 
    macro_rules! apply_int_op {
 
        ($lhs:ident, $assignment_tokens:tt, $operator:ident, $rhs:ident) => {
 
            match $lhs {
 
                Value::UInt8(v)  => { *v $assignment_tokens $rhs.as_uint8();  },
 
                Value::UInt16(v) => { *v $assignment_tokens $rhs.as_uint16(); },
 
                Value::UInt32(v) => { *v $assignment_tokens $rhs.as_uint32(); },
 
                Value::UInt64(v) => { *v $assignment_tokens $rhs.as_uint64(); },
 
                Value::SInt8(v)  => { *v $assignment_tokens $rhs.as_sint8();  },
 
                Value::SInt16(v) => { *v $assignment_tokens $rhs.as_sint16(); },
 
                Value::SInt32(v) => { *v $assignment_tokens $rhs.as_sint32(); },
 
                Value::SInt64(v) => { *v $assignment_tokens $rhs.as_sint64(); },
 
                _ => unreachable!("apply_assignment_operator {:?} on lhs {:?} and rhs {:?}", $operator, $lhs, $rhs),
 
            }
 
        }
 
    }
 

	
 
    let lhs = store.read_mut_ref(lhs);
 

	
 
    let mut to_dealloc = None;
 
    match op {
 
        AO::Set => {
 
            match lhs {
 
                Value::Unassigned => { *lhs = rhs; },
 
                Value::Input(v)  => { *v = rhs.as_input(); },
 
                Value::Output(v) => { *v = rhs.as_output(); },
 
                Value::Message(v)  => { to_dealloc = Some(*v); *v = rhs.as_message(); },
 
                Value::Bool(v)    => { *v = rhs.as_bool(); },
 
                Value::Char(v) => { *v = rhs.as_char(); },
 
                Value::String(v) => { *v = rhs.as_string().clone(); },
 
                Value::UInt8(v) => { *v = rhs.as_uint8(); },
 
                Value::UInt16(v) => { *v = rhs.as_uint16(); },
 
                Value::UInt32(v) => { *v = rhs.as_uint32(); },
 
                Value::UInt64(v) => { *v = rhs.as_uint64(); },
 
                Value::SInt8(v) => { *v = rhs.as_sint8(); },
 
                Value::SInt16(v) => { *v = rhs.as_sint16(); },
 
                Value::SInt32(v) => { *v = rhs.as_sint32(); },
 
                Value::SInt64(v) => { *v = rhs.as_sint64(); },
 
                Value::Array(v) => { to_dealloc = Some(*v); *v = rhs.as_array(); },
 
                Value::Enum(v) => { *v = rhs.as_enum(); },
 
                Value::Union(lhs_tag, lhs_heap_pos) => {
 
                    to_dealloc = Some(*lhs_heap_pos);
 
                    let (rhs_tag, rhs_heap_pos) = rhs.as_union();
 
                    *lhs_tag = rhs_tag;
 
                    *lhs_heap_pos = rhs_heap_pos;
 
                }
 
                Value::Struct(v) => { to_dealloc = Some(*v); *v = rhs.as_struct(); },
 
                _ => unreachable!("apply_assignment_operator {:?} on lhs {:?} and rhs {:?}", op, lhs, rhs),
 
            }
 
        },
 
        AO::Concatenated => {
 
            let lhs_heap_pos = lhs.get_heap_pos().unwrap() as usize;
 
            let rhs_heap_pos = rhs.get_heap_pos().unwrap() as usize;
 

	
 
            // To prevent borrowing crap, swap out heap region with a temp empty array
 
            let mut total = Vec::new();
 
            std::mem::swap(&mut total, &mut store.heap_regions[lhs_heap_pos].values);
 

	
 
            // Push everything onto the swapped vector
 
            let rhs_len = store.heap_regions[rhs_heap_pos].values.len();
 
            total.reserve(rhs_len);
 
            for value_idx in 0..rhs_len {
 
                total.push(store.clone_value(store.heap_regions[rhs_heap_pos].values[value_idx].clone()));
 
            }
 

	
 
            // Swap back in place
 
            std::mem::swap(&mut total, &mut store.heap_regions[lhs_heap_pos].values);
 

	
 
            // We took ownership of the RHS, but we copied it into the LHS, so
 
            // different form assignment we need to drop the RHS heap pos.
 
            to_dealloc = Some(rhs_heap_pos as u32);
 
        },
 
        AO::Multiplied =>   { apply_int_op!(lhs, *=,  op, rhs) },
 
        AO::Divided =>      { apply_int_op!(lhs, /=,  op, rhs) },
 
        AO::Remained =>     { apply_int_op!(lhs, %=,  op, rhs) },
 
        AO::Added =>        { apply_int_op!(lhs, +=,  op, rhs) },
 
        AO::Subtracted =>   { apply_int_op!(lhs, -=,  op, rhs) },
 
        AO::ShiftedLeft =>  { apply_int_op!(lhs, <<=, op, rhs) },
 
        AO::ShiftedRight => { apply_int_op!(lhs, >>=, op, rhs) },
 
        AO::BitwiseAnded => { apply_int_op!(lhs, &=,  op, rhs) },
 
        AO::BitwiseXored => { apply_int_op!(lhs, ^=,  op, rhs) },
 
        AO::BitwiseOred =>  { apply_int_op!(lhs, |=,  op, rhs) },
 
    }
 

	
 
    if let Some(heap_pos) = to_dealloc {
 
        store.drop_heap_pos(heap_pos);
 
    }
 
}
 

	
 
pub(crate) fn apply_binary_operator(store: &mut Store, lhs: &Value, op: BinaryOperator, rhs: &Value) -> Value {
 
    use BinaryOperator as BO;
 

	
 
    macro_rules! apply_int_op_and_return_self {
 
        ($lhs:ident, $operator_tokens:tt, $operator:ident, $rhs:ident) => {
 
            return match $lhs {
 
                Value::UInt8(v)  => { Value::UInt8( *v $operator_tokens $rhs.as_uint8() ) },
 
                Value::UInt16(v) => { Value::UInt16(*v $operator_tokens $rhs.as_uint16()) },
 
                Value::UInt32(v) => { Value::UInt32(*v $operator_tokens $rhs.as_uint32()) },
 
                Value::UInt64(v) => { Value::UInt64(*v $operator_tokens $rhs.as_uint64()) },
 
                Value::SInt8(v)  => { Value::SInt8( *v $operator_tokens $rhs.as_sint8() ) },
 
                Value::SInt16(v) => { Value::SInt16(*v $operator_tokens $rhs.as_sint16()) },
 
                Value::SInt32(v) => { Value::SInt32(*v $operator_tokens $rhs.as_sint32()) },
 
                Value::SInt64(v) => { Value::SInt64(*v $operator_tokens $rhs.as_sint64()) },
 
                _ => unreachable!("apply_binary_operator {:?} on lhs {:?} and rhs {:?}", $operator, $lhs, $rhs)
 
            };
 
        }
 
    }
 

	
 
    macro_rules! apply_int_op_and_return_bool {
 
        ($lhs:ident, $operator_tokens:tt, $operator:ident, $rhs:ident) => {
 
            return match $lhs {
 
                Value::UInt8(v)  => { Value::Bool(*v $operator_tokens $rhs.as_uint8() ) },
 
                Value::UInt16(v) => { Value::Bool(*v $operator_tokens $rhs.as_uint16()) },
 
                Value::UInt32(v) => { Value::Bool(*v $operator_tokens $rhs.as_uint32()) },
 
                Value::UInt64(v) => { Value::Bool(*v $operator_tokens $rhs.as_uint64()) },
 
                Value::SInt8(v)  => { Value::Bool(*v $operator_tokens $rhs.as_sint8() ) },
 
                Value::SInt16(v) => { Value::Bool(*v $operator_tokens $rhs.as_sint16()) },
 
                Value::SInt32(v) => { Value::Bool(*v $operator_tokens $rhs.as_sint32()) },
 
                Value::SInt64(v) => { Value::Bool(*v $operator_tokens $rhs.as_sint64()) },
 
                _ => unreachable!("apply_binary_operator {:?} on lhs {:?} and rhs {:?}", $operator, $lhs, $rhs)
 
            };
 
        }
 
    }
 

	
 
    // We need to handle concatenate in a special way because it needs the store
 
    // mutably.
 
    if op == BO::Concatenate {
 
        let target_heap_pos = store.alloc_heap();
 
        let lhs_heap_pos;
 
        let rhs_heap_pos;
 

	
 
        let lhs = store.maybe_read_ref(lhs);
 
        let rhs = store.maybe_read_ref(rhs);
 

	
 
        let value_kind;
 

	
 
        match lhs {
 
            Value::Message(lhs_pos) => {
 
                lhs_heap_pos = *lhs_pos;
 
                rhs_heap_pos = rhs.as_message();
 
                value_kind = ValueKind::Message;
 
            },
 
            Value::String(lhs_pos) => {
 
                lhs_heap_pos = *lhs_pos;
 
                rhs_heap_pos = rhs.as_string();
 
                value_kind = ValueKind::String;
 
            },
 
            Value::Array(lhs_pos) => {
 
                lhs_heap_pos = *lhs_pos;
 
                rhs_heap_pos = rhs.as_array();
 
                value_kind = ValueKind::Array;
 
            },
 
            _ => unreachable!("apply_binary_operator {:?} on lhs {:?} and rhs {:?}", op, lhs, rhs)
 
        }
 

	
 
        let lhs_heap_pos = lhs_heap_pos as usize;
 
        let rhs_heap_pos = rhs_heap_pos as usize;
 

	
 
        // TODO: I hate this, but fine...
 
        let mut concatenated = Vec::new();
 
        let lhs_len = store.heap_regions[lhs_heap_pos].values.len();
 
        let rhs_len = store.heap_regions[rhs_heap_pos].values.len();
 
        concatenated.reserve(lhs_len + rhs_len);
 
        for idx in 0..lhs_len {
 
            concatenated.push(store.clone_value(store.heap_regions[lhs_heap_pos].values[idx].clone()));
 
        }
 
        for idx in 0..rhs_len {
 
            concatenated.push(store.clone_value(store.heap_regions[rhs_heap_pos].values[idx].clone()));
 
        }
 

	
 
        store.heap_regions[target_heap_pos as usize].values = concatenated;
 

	
 
        return match value_kind{
 
            ValueKind::Message => Value::Message(target_heap_pos),
 
            ValueKind::String => Value::String(target_heap_pos),
 
            ValueKind::Array => Value::Array(target_heap_pos),
 
        };
 
    }
 

	
 
    // If any of the values are references, retrieve the thing they're referring
 
    // to.
 
    let lhs = store.maybe_read_ref(lhs);
 
    let rhs = store.maybe_read_ref(rhs);
 

	
 
    match op {
 
        BO::Concatenate => unreachable!(),
 
        BO::LogicalOr => {
 
            return Value::Bool(lhs.as_bool() || rhs.as_bool());
 
        },
 
        BO::LogicalAnd => {
 
            return Value::Bool(lhs.as_bool() && rhs.as_bool());
 
        },
 
        BO::BitwiseOr        => { apply_int_op_and_return_self!(lhs, |,  op, rhs); },
 
        BO::BitwiseXor       => { apply_int_op_and_return_self!(lhs, ^,  op, rhs); },
 
        BO::BitwiseAnd       => { apply_int_op_and_return_self!(lhs, &,  op, rhs); },
 
        BO::Equality         => { Value::Bool(apply_equality_operator(store, lhs, rhs)) },
 
        BO::Inequality       => { Value::Bool(apply_inequality_operator(store, lhs, rhs)) },
 
        BO::LessThan         => { apply_int_op_and_return_bool!(lhs, <,  op, rhs); },
 
        BO::GreaterThan      => { apply_int_op_and_return_bool!(lhs, >,  op, rhs); },
 
        BO::LessThanEqual    => { apply_int_op_and_return_bool!(lhs, <=, op, rhs); },
 
        BO::GreaterThanEqual => { apply_int_op_and_return_bool!(lhs, >=, op, rhs); },
 
        BO::ShiftLeft        => { apply_int_op_and_return_self!(lhs, <<, op, rhs); },
 
        BO::ShiftRight       => { apply_int_op_and_return_self!(lhs, >>, op, rhs); },
 
        BO::Add              => { apply_int_op_and_return_self!(lhs, +,  op, rhs); },
 
        BO::Subtract         => { apply_int_op_and_return_self!(lhs, -,  op, rhs); },
 
        BO::Multiply         => { apply_int_op_and_return_self!(lhs, *,  op, rhs); },
 
        BO::Divide           => { apply_int_op_and_return_self!(lhs, /,  op, rhs); },
 
        BO::Remainder        => { apply_int_op_and_return_self!(lhs, %,  op, rhs); }
 
    }
 
}
 

	
 
pub(crate) fn apply_unary_operator(store: &mut Store, op: UnaryOperator, value: &Value) -> Value {
 
    use UnaryOperator as UO;
 

	
 
    macro_rules! apply_int_expr_and_return {
 
        ($value:ident, $apply:tt, $op:ident) => {
 
            return match $value {
 
                Value::UInt8(v)  => Value::UInt8($apply *v),
 
                Value::UInt16(v) => Value::UInt16($apply *v),
 
                Value::UInt32(v) => Value::UInt32($apply *v),
 
                Value::UInt64(v) => Value::UInt64($apply *v),
 
                Value::SInt8(v)  => Value::SInt8($apply *v),
 
                Value::SInt16(v) => Value::SInt16($apply *v),
 
                Value::SInt32(v) => Value::SInt32($apply *v),
 
                Value::SInt64(v) => Value::SInt64($apply *v),
 
                _ => unreachable!("apply_unary_operator {:?} on value {:?}", $op, $value),
 
            };
 
        }
 
    }
 

	
 
    // If the value is a reference, retrieve the thing it is referring to
 
    let value = store.maybe_read_ref(value);
 

	
 
    match op {
 
        UO::Positive => {
 
            debug_assert!(value.is_integer());
 
            return value.clone();
 
        },
 
        UO::Negative => {
 
            // TODO: Error on negating unsigned integers
 
            return match value {
 
                Value::SInt8(v) => Value::SInt8(-*v),
 
                Value::SInt16(v) => Value::SInt16(-*v),
 
                Value::SInt32(v) => Value::SInt32(-*v),
 
                Value::SInt64(v) => Value::SInt64(-*v),
 
                _ => unreachable!("apply_unary_operator {:?} on value {:?}", op, value),
 
            }
 
        },
 
        UO::BitwiseNot => { apply_int_expr_and_return!(value, !, op)},
 
        UO::LogicalNot => { return Value::Bool(!value.as_bool()); },
 
    }
 
}
 

	
 
pub(crate) fn apply_casting(store: &mut Store, output_type: &ConcreteType, subject: &Value) -> Result<Value, String> {
 
    // To simplify the casting logic: if the output type is not a simple
 
    // integer/boolean/character, then the type checker made sure that the two
 
    // types must be equal, hence we can do a simple clone.
 
    use ConcreteTypePart as CTP;
 
    let part = &output_type.parts[0];
 
    match part {
 
        CTP::Bool | CTP::Character |
 
        CTP::UInt8 | CTP::UInt16 | CTP::UInt32 | CTP::UInt64 |
 
        CTP::SInt8 | CTP::SInt16 | CTP::SInt32 | CTP::SInt64 => {
 
            // Do the checking of these below
 
            debug_assert_eq!(output_type.parts.len(), 1);
 
        },
 
        _ => {
 
            return Ok(store.clone_value(subject.clone()));
 
        },
 
    }
 

	
 
    // Note: character is not included, needs per-type checking
 
    macro_rules! unchecked_cast {
 
        ($input: expr, $output_part: expr) => {
 
            return Ok(match $output_part {
 
                CTP::UInt8 => Value::UInt8($input as u8),
 
                CTP::UInt16 => Value::UInt16($input as u16),
 
                CTP::UInt32 => Value::UInt32($input as u32),
src/runtime2/messages.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 

	
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.sending_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
    sent_counter: usize,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            sent_counter: 0,
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> {
 
        if self.sent_counter == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let cur_index = self.sent_counter;
 
        self.sent_counter += 1;
 
        return Some(&self.messages[cur_index]);
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
        self.sent_counter = 0;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
\ No newline at end of file
 
mod runtime;
 
mod messages;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::{HashMap, HashSet, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 
use super::messages::*;
 

	
 
enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
struct PortDesc {
 
    id: u32,
 
    peer_id: u32,
 
    owning_connector_id: Option<u32>,
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
// Message received from some kind of peer
 
struct BufferedMessage {
 
    // If in inbox, then sender is the connector's peer. If in the outbox, then
 
    // the sender is the connector itself.
 
    sending_port: PortId,
 
    receiving_port: PortId,
 
    peer_prev_branch_id: Option<u32>, // of the sender
 
    peer_cur_branch_id: u32, // of the sender
 
    message: ValueGroup,
 
}
 

	
 
struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    branch_id_counter: u32,
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, u32>, // from port_id to branch index
 
    global_inbox: HashMap<(PortId, u32), BufferedMessage>,
 
    global_outbox: HashMap<(PortId, u32), BufferedMessage>,
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        let mut branches_active = VecDeque::new();
 
        branches_active.push_back(0);
 

	
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            branch_id_counter: 1,
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            global_inbox: HashMap::new(),
 
            global_outbox: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 

	
 
enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
}
 

	
 
struct BranchPortDesc {
 
    last_registered_identifier: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    identifier: u32,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            identifier: 0,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self {
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            identifier,
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
        }
 
    }
 
}
 

	
 
// Separate from Runtime for borrowing reasons
 
struct Registry {
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
}
 

	
 
impl Registry {
 
    fn new() -> Self {
 
        Self{
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
        }
 
    }
 

	
 
    /// Returns (putter_port, getter_port)
 
    pub fn add_channel(&mut self, owning_connector_id: Option<u32>) -> (u32, u32) {
 
        let get_id = self.generate_port_id();
 
        let put_id = self.generate_port_id();
 

	
 
        self.ports.insert(get_id, PortDesc{
 
            id: get_id,
 
            peer_id: put_id,
 
            owning_connector_id,
 
            is_getter: true,
 
        });
 
        self.ports.insert(put_id, PortDesc{
 
            id: put_id,
 
            peer_id: get_id,
 
            owning_connector_id,
 
            is_getter: false,
 
        });
 

	
 
        return (put_id, get_id);
 
    }
 

	
 
    fn generate_port_id(&mut self) -> u32 {
 
        let id = self.port_counter;
 
        self.port_counter += 1;
 
        return id;
 
    }
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
}
 

	
 
// Local solution of the connector
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    silent_ports: Vec<u32>, // port IDs of the connector itself
 
}
 

	
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_propositions: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    registry: Registry,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            registry: Registry::new(),
 
            connectors_active: VecDeque::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel that is not owned by any connector and returns its
 
    /// endpoints. The returned values are of the (putter port, getter port)
 
    /// respectively.
 
    pub fn add_channel(&mut self) -> (Value, Value) {
 
        let (put_id, get_id) = self.registry.add_channel(None);
 
        return (
 
            port_value_from_id(None, put_id, true),
 
            port_value_from_id(None, get_id, false)
 
        );
 
    }
 

	
 
    pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> {
 
        use AddComponentError as ACE;
 
        use crate::runtime::error::AddComponentError as OldACE;
 

	
 
        // TODO: Allow the ValueGroup to contain any kind of value
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
        // TODO: Remove this error enum translation. Note that for now this
 
        //  function forces port-only arguments
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                Value::Output(port_id) => {
 
                    if *polarity != Polarity::Putter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component
 
        let component_id = self.generate_connector_id();
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            self.run_connector(next_id);
 
            let run_again = self.run_connector(next_id);
 

	
 
            if run_again {
 
                self.connectors_active.push_back(next_id);
 
            }
 

	
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_solution(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, id: u32) -> bool {
 
        let desc = self.registry.connectors.get_mut(&id).unwrap();
 
        let mut run_context = Context{
 
            connector_id: id,
 
            branch_id: None,
 
            pending_channel: None,
 
        };
 

	
 
        let mut call_again = false;
 
        let mut call_again = false; // TODO: Come back to this, silly pattern
 

	
 
        while call_again {
 
            call_again = false; // bit of a silly pattern, maybe revise
 

	
 
            if desc.in_sync {
 
                // Running in synchronous mode, so run all branches until their
 
                // blocking point
 
                debug_assert!(!desc.spec_branches_active.is_empty());
 
                let branch_index = desc.spec_branches_active.pop_front().unwrap();
 

	
 
                let branch = &mut desc.branches[branch_index as usize];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::BranchInconsistent => {
 
                        // Speculative branch became inconsistent. So we don't
 
                        // run it again
 
                        branch.branch_state = BranchState::Failed;
 
                    },
 
                    RunResult::BranchMissingPortState(port_id) => {
 
                        // Branch called `fires()` on a port that did not have a
 
                        // value assigned yet. So branch and keep running
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                        let copied_index = Self::duplicate_branch(desc, branch_index);
 

	
 
                        // Need to re-borrow to assign changed port state
 
                        let original_branch = &mut desc.branches[branch_index as usize];
 
                        original_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 0,
 
                        });
 

	
 
                        let copied_branch = &mut desc.branches[copied_index as usize];
 
                        copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                            last_registered_identifier: None,
 
                            num_times_fired: 1,
 
                        });
 

	
 
                        // Run both again
 
                        desc.spec_branches_active.push_back(branch_index);
 
                        desc.spec_branches_active.push_back(copied_index);
 
                    },
 
                    RunResult::BranchMissingPortValue(port_id) => {
 
                        // Branch just performed a `get()` on a port that did
 
                        // not yet receive a value.
 

	
 
                        // First check if a port value is assigned to the
 
                        // current branch. If so, check if it is consistent.
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        let mut insert_in_pending_receive = false;
 

	
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet, so force to firing
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 1,
 
                                });
 
                                branch.branch_state = BranchState::BranchPoint;
 
                                desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                insert_in_pending_receive = true;
 
                            },
 
                            Entry::Occupied(entry) => {
 
                                // Have an entry, check if it is consistent
 
                                let entry = entry.get();
 
                                if entry.num_times_fired == 0 {
 
                                    // Inconsistent
 
                                    branch.branch_state = BranchState::Failed;
 
                                } else {
 
                                    // Perfectly fine, add to queue
 
                                    debug_assert!(entry.last_registered_identifier.is_none());
 
                                    assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                                    branch.branch_state = BranchState::BranchPoint;
 
                                    desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                    insert_in_pending_receive = true;
 
                                }
 
                            }
 
                        }
 

	
 
                        if insert_in_pending_receive {
 
                            // Perform the insert
 
                            match desc.spec_branches_pending_receive.entry(port_id) {
 
                                Entry::Vacant(entry) => {
 
                                    entry.insert(vec![branch_index]);
 
                                }
 
                                Entry::Occupied(mut entry) => {
 
                                    let entry = entry.get_mut();
 
                                    debug_assert!(!entry.contains(&branch_index));
 
                                    entry.push(branch_index);
 
                                }
 
                            }
 

	
 
                            // But also check immediately if we don't have a
 
                            // previously received message. If so, we
 
                            // immediately branch and accept the message
 
                            if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                                for message in messages {
 
                                    let new_branch_idx = Self::duplicate_branch(desc, branch_index);
 
                                    let new_branch = &mut desc.branches[new_branch_idx as usize];
 
                                    let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                                    new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id);
 
                                    new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                                    desc.spec_branches_active.push_back(new_branch_idx);
 
                                }
 
                            }
 
                        }
 
                    },
 
                    RunResult::BranchAtSyncEnd => {
 
                        // Check the branch for any ports that were not used and
 
                        // insert them in the port mapping as not having fired.
 
                        for port_index in branch.owned_ports {
 
                            let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                            if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                                entry.insert(BranchPortDesc {
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 0
 
                                });
 
                            }
 
                        }
 

	
 
                        // Mark the branch as being done
 
                        branch.branch_state = BranchState::ReachedEndSync;
 
                        todo!("somehow propose solution");
 
                        desc.spec_branches_done.push(branch_index);
 
                    },
 
                    RunResult::BranchPut(port_id, value_group) => {
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert_eq!(value_group.values.len(), 1)
 
                        debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                        // Branch just performed a `put()`. Check if we have
 
                        // assigned the port value and if so, if it is
 
                        // consistent.
 
                        let mut can_put = true;
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: Some(branch.identifier),
 
                                    num_times_fired: 1,
 
                                });
 
                            },
 
                            Entry::Occupied(mut entry) => {
 
                                // Pre-existing entry
 
                                let entry = entry.get_mut();
 
                                if entry.num_times_fired == 0 {
 
                                    // This is 'fine' in the sense that we have
 
                                    // a normal inconsistency in the branch.
 
                                    branch.branch_state = BranchState::Failed;
 
                                    can_put = false;
 
                                } else if entry.last_registered_identifier.is_none() {
 
                                    // A put() that follows a fires()
 
                                    entry.last_registered_identifier = Some(branch.identifier);
 
                                } else {
 
                                    // This should be fine in the future. But
 
                                    // for now we throw an error as it doesn't
 
                                    // mesh well with the 'fires()' concept.
 
                                    todo!("throw an error of some sort, then fail all related")
 
                                }
 
                            }
 
                        }
 

	
 
                        if can_put {
 
                            // Actually put the message in the outbox
 
                            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
                            let peer_id = port_desc.peer_id;
 
                            let peer_desc = self.registry.ports.get(&peer_id).unwrap();
 
                            debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                            let peer_id = PortId(Id{
 
                                connector_id: peer_desc.owning_connector_id.unwrap(),
 
                                u32_suffix: peer_id
 
                            });
 

	
 
                            // For now this is the one and only time we're going
 
                            // to send a message. So for now we can't send a
 
                            // branch ID.
 
                            desc.global_outbox.insert((port_id, 1), BufferedMessage{
 
                                sending_port: port_id,
 
                                receiving_port: peer_id,
 
                                peer_prev_branch_id: None,
 
                                peer_cur_branch_id: 0,
 
                                message: value_group,
 
                            });
 

	
 
                            // Finally, because we were able to put the message,
 
                            // we can run the branch again
 
                            desc.spec_branches_active.push_back(branch_index);
 
                            call_again = true;
 
                        }
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
                }
 
            } else {
 
                // Running in non-synchronous mode
 
                let branch = &mut desc.branches[0];
 
                let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
                match run_result {
 
                    RunResult::ComponentTerminated => return false,
 
                    RunResult::ComponentAtSyncStart => {
 
                        // Prepare for sync execution
 
                        Self::prepare_branch_for_sync(desc);
 
                        call_again = true;
 
                    },
 
                    RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                        // Generate a new connector with its own state
 
                        let new_component_id = self.generate_connector_id();
 
                        let new_component_state = ComponentState {
 
                            prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                        };
 

	
 
                        // Transfer the ownership of any ports to the new connector
 
                        let mut ports = Vec::with_capacity(arguments.values.len());
 
                        find_ports_in_value_group(&arguments, &mut ports);
 
                        for port_id in &ports {
 
                            let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                            debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id);
 
                            port.owning_connector_id = Some(new_component_id)
 
                        }
 

	
 
                        // Finally push the new connector into the registry
 
                        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                        self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                        self.connectors_active.push_back(new_component_id);
 
                    },
 
                    RunResult::NewChannel => {
 
                        // Prepare channel
 
                        debug_assert!(run_context.pending_channel.is_none());
 
                        let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id));
 
                        run_context.pending_channel = Some((
 
                            port_value_from_id(Some(run_context.connector_id), put_id, true),
 
                            port_value_from_id(Some(run_context.connector_id), get_id, false)
 
                        ));
 

	
 
                        // Call again so it is retrieved from the context
 
                        call_again = true;
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() {
 
            // Lookup the target connector
 
            let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 

	
 
            // Check if there are any branches that are waiting on
 
            // receives
 
            if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) {
 
                // Check each of the branches for a port mapping that
 
                // matches the one on the message header
 
                for branch_index in branch_indices {
 
                    let branch = &mut target_connector.branches[*branch_index as usize];
 
                    debug_assert_eq!(branch.branch_state, BranchState::BranchPoint);
 

	
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                            can_branch = true;
 
                        }
 
                    }
 

	
 
                    if can_branch {
 
                        // Put the message inside a clone of the currently
 
                        // waiting branch
 
                        let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch = &mut target_connector.branches[new_branch_idx as usize];
 
                        let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap();
 
                        new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id);
 
                        new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone());
 

	
 
                        // And queue the branch for further execution
 
                        target_connector.spec_branches_active.push(new_branch_idx);
 
                        if !self.connectors_active.contains(&target_connector.id) {
 
                            self.connectors_active.push_back(target_connector.id);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Checks a connector for the submitted solutions. After all neighbouring
 
    /// connectors have been checked all of their "last checked solution" index
 
    /// will be incremented.
 
    fn check_connector_new_solutions(&mut self, connector_index: u32) {
 
        // Take connector and start processing its solutions
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        let mut considered_connectors = HashSet::new();
 
        let mut valid_solutions = Vec::new();
 

	
 
        while connector.last_checked_done != connector.spec_branches_done.len() as u32 {
 
            // We have a new solution to consider
 
            let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize];
 
            connector.last_checked_done += 1;
 

	
 
            let branch = &connector.branches[start_branch_index as usize];
 
            debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync);
 

	
 
            // Clear storage for potential solutions
 
            considered_connectors.clear();
 

	
 
            // Start seeking solution among other connectors within the same
 
            // synchronous region
 
            considered_connectors.insert(connector.id);
 
            for port in branch.port_
 
        }
 
    }
 

	
 
    fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) {
 
        // Take the connector and branch of interest
 
        let first_connector = self.registry.connectors.get(&first_connector_index).unwrap();
 
        let first_branch = &first_connector.branches[first_branch_index as usize];
 
        debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync);
 

	
 
        // Setup the first solution
 
        let mut first_solution = ProposedSolution{
 
            connector_mapping: HashMap::new(),
 
            connector_propositions: HashMap::new(),
 
            remaining_connectors: Vec::new(),
 
        };
 
        first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier);
 
        for (port_id, port_mapping) in first_branch.port_mapping.iter() {
 
            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap();
 
            let peer_connector_id = peer_port_desc.owning_connector_id.unwrap();
 

	
 
            let constraint = match port_mapping.last_registered_identifier {
 
                Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id),
 
                None => ProposedBranchConstraint::SilentPort(peer_port_id),
 
            };
 

	
 
            match first_solution.connector_propositions.entry(peer_connector_id) {
 
                Entry::Vacant(entry) => {
 
                    // Not yet encountered
 
                    entry.insert(vec![constraint]);
 
                    first_solution.remaining_connectors.push(peer_connector_id);
 
                },
 
                Entry::Occupied(mut entry) => {
 
                    // Already encountered
 
                    let entry = entry.get_mut();
 
                    if !entry.contains(&constraint) {
 
                        entry.push(constraint);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Setup storage for all possible solutions
 
        let mut all_solutions = Vec::new();
 
        all_solutions.push(first_solution);
 

	
 
        while !all_solutions.is_empty() {
 
            let mut cur_solution = all_solutions.pop().unwrap();
 

	
 
        }
 
    }
 

	
 
    fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec<ProposedSolution>, target_connector: u32) {
 
        debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited
 
        debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it
 

	
 
        let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap();
 
        let cur_connector = self.registry.connectors.get(&target_connector).unwrap();
 

	
 
        // Make sure all propositions are unique
 
        for i in 0..branch_propositions.len() {
 
            let proposition_i = branch_propositions[i];
 
            for j in 0..i {
 
                let proposition_j = branch_propositions[j];
 
                debug_assert_ne!(proposition_i, proposition_j);
 
            }
 
        }
 

	
 
        // Check connector for compatible branches
 
        let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len());
 
        let mut encountered_propositions = Vec::new();
 

	
 
        'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done {
 
            // Reset the propositions matching variables
 
            encountered_propositions.clear();
 
            encountered_propositions.resize(branch_propositions.len(), false);
 

	
 
            // First check the silent port propositions
 
            let cur_branch = &cur_connector.branches[branch_idx as usize];
 
            for (proposition_idx, proposition) in branch_propositions.iter().enumerate() {
 
                match proposition {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id });
 
                        let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap();
 
                        if port_mapping.num_times_fired != 0 {
 
                            // Port did fire, so the current branch is not
 
                            // compatible
 
                            continue 'finished_branch_loop;
 
                        }
 

	
 
                        // Otherwise, the port was silent indeed
 
                        encountered_propositions[proposition_idx] = true;
 
                    },
 
                    ProposedBranchConstraint::BranchNumber(_) => {},
 
                }
 
            }
 

	
 
            // Then check the branch number propositions
 
            let mut parent_branch_idx = branch_idx;
 
            loop {
 
                let branch = &cur_connector.branches[parent_branch_idx as usize];
 
                for proposition_idx in 0..branch_propositions.len() {
 
                    let proposition = branch_propositions[proposition_idx];
 
                    match proposition {
 
                        ProposedBranchConstraint::SilentPort(_) => {},
 
                        ProposedBranchConstraint::BranchNumber(branch_number) => {
 
                            if branch_number == branch.identifier {
 
                                encountered_propositions[proposition_idx] = true;
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if branch.parent_index.is_none() {
 
                    // No more parents
 
                    break;
 
                }
 

	
 
                parent_branch_idx = branch.parent_index.unwrap();
 
            }
 

	
 
            if !encountered_propositions.iter().all(|v| *v) {
 
                // Not all of the constraints were matched
 
                continue 'finished_branch_loop
 
            }
 

	
 
            // All of the constraints on the branch did indeed match.
 
        }
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.registry.connector_counter;
 
        self.registry.connector_counter += 1;
 
        return id;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Helpers for branch management
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a speculative branch for further execution from the connector's
 
    /// non-speculative base branch.
 
    fn prepare_branch_for_sync(desc: &mut ConnectorDesc) {
 
        // Ensure only one branch is active, the non-sync branch
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        let new_branch_index = 1;
 
        let new_branch_identifier = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_id);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns its index.
 
    fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_id = desc.branch_id_counter;
 
        desc.branch_id_counter += 1;
 

	
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch);
 
        desc.branches.push(copied_branch);
 

	
 
        return copied_index;
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn fires(&self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 
                
 
                ports.push(*port_id);
 
            },
 
            Value::Array(heap_pos) | 
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 
    
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
fn port_value_from_id(connector_id: Option<u32>, port_id: u32, is_output: bool) -> Value {
 
    let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system
 
    if is_output {
 
        return Value::Output(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id
 
        }));
 
    } else {
 
        return Value::Input(PortId(Id{
 
            connector_id,
 
            u32_suffix: port_id,
 
        }));
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)