Changeset - daf15df0f8ca
[Not reviewed]
0 9 0
MH - 4 years ago 2021-10-11 12:24:44
contact@maxhenger.nl
scaffolding in place for scheduler/runtime
9 files changed with 641 insertions and 148 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
mod arena;
 
pub(crate) mod eval;
 
pub(crate) mod input_source;
 
mod parser;
 
#[cfg(test)] mod tests;
 

	
 
pub(crate) mod ast;
 
pub(crate) mod ast_printer;
 

	
 
use std::sync::Mutex;
 

	
 
use crate::collections::{StringPool, StringRef};
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::type_table::*;
 

	
 
/// A protocol description module
 
pub struct Module {
 
    pub(crate) source: InputSource,
 
    pub(crate) root_id: RootId,
 
    pub(crate) name: Option<StringRef<'static>>,
 
}
 
/// Description of a protocol object, used to configure new connectors.
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    pub(crate) modules: Vec<Module>,
 
    pub(crate) heap: Heap,
 
    pub(crate) types: TypeTable,
 
    pub(crate) pool: Mutex<StringPool>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentState {
 
    pub(crate) prompt: Prompt,
 
}
 

	
 
#[allow(dead_code)]
 
pub(crate) enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    None,
 
}
 
//////////////////////////////////////////////
 

	
 
#[derive(Debug)]
 
pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(An opaque protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    // TODO: Allow for multi-file compilation
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        // TODO: @fixme, keep code compilable, but needs support for multiple
 
        //  input files.
 
        let source = InputSource::new(String::new(), Vec::from(buffer));
 
        let mut parser = Parser::new();
 
        parser.feed(source).expect("failed to feed source");
 
        
 
        if let Err(err) = parser.parse() {
 
            println!("ERROR:\n{}", err);
 
            return Err(format!("{}", err))
 
        }
 

	
 
        debug_assert_eq!(parser.modules.len(), 1, "only supporting one module here for now");
 
        let modules: Vec<Module> = parser.modules.into_iter()
 
            .map(|module| Module{
 
                source: module.source,
 
                root_id: module.root_id,
 
                name: module.name.map(|(_, name)| name)
 
            })
 
            .collect();
 

	
 
        return Ok(ProtocolDescription {
 
            modules,
 
            heap: parser.heap,
 
            types: parser.type_table,
 
            pool: Mutex::new(parser.string_pool),
 
        });
 
    }
 

	
 
    #[deprecated]
 
    pub(crate) fn component_polarities(
 
        &self,
 
        module_name: &[u8],
 
        identifier: &[u8],
 
    ) -> Result<Vec<Polarity>, AddComponentError> {
 
        use AddComponentError::*;
 

	
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(AddComponentError::NoSuchModule);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier);
 
        if def.is_none() {
 
            return Err(NoSuchComponent);
 
        }
 

	
 
        let def = &self.heap[def.unwrap()];
 
        if !def.is_component() {
 
            return Err(NoSuchComponent);
 
        }
 

	
 
        for &param in def.parameters().iter() {
 
            let param = &self.heap[param];
 
            let first_element = &param.parser_type.elements[0];
 

	
 
            match first_element.variant {
 
                ParserTypeVariant::Input | ParserTypeVariant::Output => continue,
 
                _ => {
 
                    return Err(NonPortTypeParameters);
 
                }
 
            }
 
        }
 

	
 
        let mut result = Vec::new();
 
        for &param in def.parameters().iter() {
 
            let param = &self.heap[param];
 
            let first_element = &param.parser_type.elements[0];
 

	
 
            if first_element.variant == ParserTypeVariant::Input {
 
                result.push(Polarity::Getter)
 
            } else if first_element.variant == ParserTypeVariant::Output {
 
                result.push(Polarity::Putter)
 
            } else {
 
                unreachable!()
 
            }
 
        }
 
        Ok(result)
 
    }
 

	
 
    // expects port polarities to be correct
 
    #[deprecated]
 
    pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(x)),
 
                Polarity::Putter => args.push(Value::Output(x)),
 
            }
 
        }
 

	
 
        let module_root = self.lookup_module_root(module_name).unwrap();
 
        let root = &self.heap[module_root];
 
        let def = root.get_definition_ident(&self.heap, identifier).unwrap();
 
        // TODO: Check for polymorph
 
        ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) }
 
    }
 

	
 
    // TODO: Ofcourse, rename this at some point, perhaps even remove it in its
 
    //  entirety. Find some way to interface with the parameter's types.
 
    pub(crate) fn new_component_v2(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<ComponentState, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let definition = &self.heap[definition_id];
 
        if !definition.is_component() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
        let definition = definition.as_component();
 
        if !definition.poly_vars.is_empty() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // - check number of arguments
 
        let expr_data = self.types.get_procedure_expression_data(&definition_id, 0);
 
        if expr_data.arg_types.len() != arguments.values.len() {
 
            return Err(ComponentCreationError::InvalidNumArguments);
 
        }
 

	
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type = &expr_data.arg_types[arg_idx];
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments),
 
        });
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
 
                None => if module_name.is_empty() {
 
                    return Some(module.root_id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        macro_rules! match_variant {
 
            ($value:expr, $variant:expr) => {
 
                if let $variant(_) = $value { true } else { false }
 
            };
 
        }
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => match_variant!(argument, Value::Bool),
 
            CTP::UInt8 => match_variant!(argument, Value::UInt8),
 
            CTP::UInt16 => match_variant!(argument, Value::UInt16),
 
            CTP::UInt32 => match_variant!(argument, Value::UInt32),
 
            CTP::UInt64 => match_variant!(argument, Value::UInt64),
 
            CTP::SInt8 => match_variant!(argument, Value::SInt8),
 
            CTP::SInt16 => match_variant!(argument, Value::SInt16),
 
            CTP::SInt32 => match_variant!(argument, Value::SInt32),
 
            CTP::SInt64 => match_variant!(argument, Value::SInt64),
 
            CTP::Character => match_variant!(argument, Value::Char),
 
            CTP::String => {
 
                // Match outer string type and embedded character types
 
                if let Value::String(heap_pos) = argument {
 
                    for element in &arguments.regions[*heap_pos as usize] {
 
                        if let Value::Char(_) = element {} else {
 
                            return false;
 
                        }
 
                    }
 
                } else {
 
                    return false;
 
                }
 

	
 
                return true;
 
            },
 
            CTP::Array => {
 
                if let Value::Array(heap_pos) = argument {
 
                    let heap_pos = *heap_pos;
 
                    for element in &arguments.regions[heap_pos as usize] {
 
                        if !self.verify_same_type(expected, expected_idx + 1, arguments, element) {
 
                            return false;
 
                        }
 
                    }
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => match_variant!(argument, Value::Input),
 
            CTP::Output => match_variant!(argument, Value::Output),
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
}
 

	
 
#[derive(Debug)]
 
pub enum RunResult {
 
    // Can only occur outside sync blocks
 
    ComponentTerminated, // component has exited its procedure
 
    ComponentAtSyncStart,
 
    NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync
 
    NewChannel, // should also be possible inside sync
 
    // Can only occur inside sync blocks
 
    BranchInconsistent, // branch has inconsistent behaviour
 
    BranchMissingPortState(PortId), // branch doesn't know about port firing
 
    BranchMissingPortValue(PortId), // branch hasn't received message on input port yet
 
    BranchAtSyncEnd,
 
    BranchPut(PortId, ValueGroup),
 
}
 

	
 
impl ComponentState {
 
    pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult {
 
        use EvalContinuation as EC;
 
        use RunResult as RR;
 

	
 
        loop {
 
            let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            match step_result {
 
                Err(reason) => {
 
                    // TODO: @temp
 
                    println!("Evaluation error:\n{}", reason);
 
                    todo!("proper error handling/bubbling up");
 
                },
 
                Ok(continuation) => match continuation {
 
                    // TODO: Probably want to remove this translation
 
                    EC::Stepping => continue,
 
                    EC::Inconsistent => return RR::BranchInconsistent,
 
                    EC::Terminal => return RR::ComponentTerminated,
 
                    EC::SyncBlockStart => return RR::ComponentAtSyncStart,
 
                    EC::SyncBlockEnd => return RR::BranchAtSyncEnd,
 
                    EC::NewComponent(definition_id, monomorph_idx, args) =>
 
                        return RR::NewComponent(definition_id, monomorph_idx, args),
 
                    EC::NewChannel =>
src/protocol/parser/pass_typing.rs
Show inline comments
 
@@ -1505,104 +1505,121 @@ impl PassTyping {
 
                        Some(reserved_idx) => {
 
                            // Already typechecked, or already put into the resolve queue
 
                            infer_expr.field_or_monomorph_idx = reserved_idx;
 
                        },
 
                        None => {
 
                            // Not typechecked yet, so add an entry in the queue
 
                            let reserved_idx = ctx.types.reserve_procedure_monomorph_index(&definition_id, concrete_type);
 
                            infer_expr.field_or_monomorph_idx = reserved_idx;
 
                            queue.push(ResolveQueueElement{
 
                                root_id: ctx.heap[definition_id].defined_in(),
 
                                definition_id,
 
                                reserved_monomorph_idx: reserved_idx,
 
                            });
 
                        }
 
                    }
 
                },
 
                Expression::Literal(expr) => {
 
                    let definition_id = match &expr.value {
 
                        Literal::Enum(lit) => lit.definition,
 
                        Literal::Union(lit) => lit.definition,
 
                        Literal::Struct(lit) => lit.definition,
 
                        _ => unreachable!(),
 
                    };
 
                    let first_concrete_part = ConcreteTypePart::Instance(definition_id, extra_data.poly_vars.len() as u32);
 
                    let concrete_type = inference_type_to_concrete_type(
 
                        ctx, extra_data.expr_id, &extra_data.poly_vars, first_concrete_part
 
                    )?;
 
                    let mono_index = ctx.types.add_data_monomorph(ctx.modules, ctx.heap, ctx.arch, definition_id, concrete_type)?;
 
                    infer_expr.field_or_monomorph_idx = mono_index;
 
                },
 
                Expression::Select(_) => {
 
                    debug_assert!(infer_expr.field_or_monomorph_idx >= 0);
 
                },
 
                _ => {
 
                    unreachable!("handling extra data for expression {:?}", &ctx.heap[extra_data.expr_id]);
 
                }
 
            }
 
        }
 

	
 
        // If we did any implicit type forcing, then our queue isn't empty
 
        // anymore
 
        while !self.expr_queued.is_empty() {
 
            let expr_idx = self.expr_queued.pop_back().unwrap();
 
            self.progress_expr(ctx, expr_idx)?;
 
        }
 

	
 
        // Every expression checked, and new monomorphs are queued. Transfer the
 
        // expression information to the type table.
 
        let definition_id = match &self.definition_type {
 
            DefinitionType::Component(id) => id.upcast(),
 
            DefinitionType::Function(id) => id.upcast(),
 
        let (definition_id, procedure_arguments) = match &self.definition_type {
 
            DefinitionType::Component(id) => {
 
                let definition = &ctx.heap[*id];
 
                (id.upcast(), &definition.parameters)
 
            },
 
            DefinitionType::Function(id) => {
 
                let definition = &ctx.heap[*id];
 
                (id.upcast(), &definition.parameters)
 
            },
 
        };
 

	
 
        let target = ctx.types.get_procedure_expression_data_mut(&definition_id, self.reserved_idx);
 
        debug_assert!(target.expr_data.is_empty()); // makes sure we never queue something twice
 
        debug_assert!(target.arg_types.is_empty()); // makes sure we never queue a procedure's type inferencing twice
 
        debug_assert!(target.expr_data.is_empty());
 

	
 
        // - Write the arguments to the procedure
 
        target.arg_types.reserve(procedure_arguments.len());
 
        for argument_id in procedure_arguments {
 
            let mut concrete = ConcreteType::default();
 
            let argument_type = self.var_types.get(argument_id).unwrap();
 
            argument_type.var_type.write_concrete_type(&mut concrete);
 
            target.arg_types.push(concrete);
 
        }
 

	
 
        // - Write the expression data
 
        target.expr_data.reserve(self.expr_types.len());
 
        for infer_expr in self.expr_types.iter() {
 
            let mut concrete = ConcreteType::default();
 
            infer_expr.expr_type.write_concrete_type(&mut concrete);
 
            target.expr_data.push(MonomorphExpression{
 
                expr_type: concrete,
 
                field_or_monomorph_idx: infer_expr.field_or_monomorph_idx
 
            });
 
        }
 

	
 
        Ok(())
 
    }
 

	
 
    fn progress_expr(&mut self, ctx: &mut Ctx, idx: i32) -> Result<(), ParseError> {
 
        let id = self.expr_types[idx as usize].expr_id; // TODO: @Temp
 
        match &ctx.heap[id] {
 
            Expression::Assignment(expr) => {
 
                let id = expr.this;
 
                self.progress_assignment_expr(ctx, id)
 
            },
 
            Expression::Binding(expr) => {
 
                let id = expr.this;
 
                self.progress_binding_expr(ctx, id)
 
            },
 
            Expression::Conditional(expr) => {
 
                let id = expr.this;
 
                self.progress_conditional_expr(ctx, id)
 
            },
 
            Expression::Binary(expr) => {
 
                let id = expr.this;
 
                self.progress_binary_expr(ctx, id)
 
            },
 
            Expression::Unary(expr) => {
 
                let id = expr.this;
 
                self.progress_unary_expr(ctx, id)
 
            },
 
            Expression::Indexing(expr) => {
 
                let id = expr.this;
 
                self.progress_indexing_expr(ctx, id)
 
            },
 
            Expression::Slicing(expr) => {
 
                let id = expr.this;
 
                self.progress_slicing_expr(ctx, id)
 
            },
 
            Expression::Select(expr) => {
 
                let id = expr.this;
 
                self.progress_select_expr(ctx, id)
 
            },
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -275,96 +275,97 @@ impl DefinedTypeVariant {
 
    }
 

	
 
    pub(crate) fn as_union(&self) -> &UnionType {
 
        match self {
 
            DefinedTypeVariant::Union(v) => v,
 
            _ => unreachable!("Cannot convert {} to union variant", self.type_class())
 
        }
 
    }
 

	
 
    pub(crate) fn as_union_mut(&mut self) -> &mut UnionType {
 
        match self {
 
            DefinedTypeVariant::Union(v) => v,
 
            _ => unreachable!("Cannot convert {} to union variant", self.type_class())
 
        }
 
    }
 

	
 
    pub(crate) fn procedure_monomorphs(&self) -> &Vec<ProcedureMonomorph> {
 
        use DefinedTypeVariant::*;
 

	
 
        match self {
 
            Function(v) => &v.monomorphs,
 
            Component(v) => &v.monomorphs,
 
            _ => unreachable!("cannot get procedure monomorphs from {}", self.type_class()),
 
        }
 
    }
 

	
 
    pub(crate) fn procedure_monomorphs_mut(&mut self) -> &mut Vec<ProcedureMonomorph> {
 
        use DefinedTypeVariant::*;
 

	
 
        match self {
 
            Function(v) => &mut v.monomorphs,
 
            Component(v) => &mut v.monomorphs,
 
            _ => unreachable!("cannot get procedure monomorphs from {}", self.type_class()),
 
        }
 
    }
 
}
 

	
 
pub struct PolymorphicVariable {
 
    identifier: Identifier,
 
    is_in_use: bool, // a polymorphic argument may be defined, but not used by the type definition
 
}
 

	
 
/// Data associated with a monomorphized procedure type. Has the wrong name,
 
/// because it will also be used to store expression data for a non-polymorphic
 
/// procedure. (in that case, there will only ever be one)
 
pub struct ProcedureMonomorph {
 
    // Expression data for one particular monomorph
 
    pub concrete_type: ConcreteType,
 
    pub arg_types: Vec<ConcreteType>,
 
    pub expr_data: Vec<MonomorphExpression>,
 
}
 

	
 
/// `EnumType` is the classical C/C++ enum type. It has various variants with
 
/// an assigned integer value. The integer values may be user-defined,
 
/// compiler-defined, or a mix of the two. If a user assigns the same enum
 
/// value multiple times, we assume the user is an expert and we consider both
 
/// variants to be equal to one another.
 
pub struct EnumType {
 
    pub variants: Vec<EnumVariant>,
 
    pub monomorphs: Vec<EnumMonomorph>,
 
    pub minimum_tag_value: i64,
 
    pub maximum_tag_value: i64,
 
    pub tag_type: ConcreteType,
 
    pub size: usize,
 
    pub alignment: usize,
 
}
 

	
 
// TODO: Also support maximum u64 value
 
pub struct EnumVariant {
 
    pub identifier: Identifier,
 
    pub value: i64,
 
}
 

	
 
pub struct EnumMonomorph {
 
    pub concrete_type: ConcreteType,
 
}
 

	
 
/// `UnionType` is the algebraic datatype (or sum type, or discriminated union).
 
/// A value is an element of the union, identified by its tag, and may contain
 
/// a single subtype.
 
/// For potentially infinite types (i.e. a tree, or a linked list) only unions
 
/// can break the infinite cycle. So when we lay out these unions in memory we
 
/// will reserve enough space on the stack for all union variants that do not
 
/// cause "type loops" (i.e. a union `A` with a variant containing a struct
 
/// `B`). And we will reserve enough space on the heap (and store a pointer in
 
/// the union) for all variants which do cause type loops (i.e. a union `A`
 
/// with a variant to a struct `B` that contains the union `A` again).
 
pub struct UnionType {
 
    pub variants: Vec<UnionVariant>,
 
    pub monomorphs: Vec<UnionMonomorph>,
 
    pub tag_type: ConcreteType,
 
    pub tag_size: usize,
 
}
 

	
 
pub struct UnionVariant {
 
    pub identifier: Identifier,
 
    pub embedded: Vec<ParserType>, // zero-length does not have embedded values
 
@@ -602,96 +603,97 @@ impl TypeTable {
 
    /// Retrieves base definition from type table. We must be able to retrieve
 
    /// it as we resolve all base types upon type table construction (for now).
 
    /// However, in the future we might do on-demand type resolving, so return
 
    /// an option anyway
 
    pub(crate) fn get_base_definition(&self, definition_id: &DefinitionId) -> Option<&DefinedType> {
 
        self.lookup.get(&definition_id)
 
    }
 

	
 
    /// Returns the index into the monomorph type array if the procedure type
 
    /// already has a (reserved) monomorph.
 
    pub(crate) fn get_procedure_monomorph_index(&self, definition_id: &DefinitionId, types: &ConcreteType) -> Option<i32> {
 
        let def = self.lookup.get(definition_id).unwrap();
 
        let monos = def.definition.procedure_monomorphs();
 
        return monos.iter()
 
            .position(|v| v.concrete_type == *types)
 
            .map(|v| v as i32);
 
    }
 

	
 
    /// Returns a mutable reference to a procedure's monomorph expression data.
 
    /// Used by typechecker to fill in previously reserved type information
 
    pub(crate) fn get_procedure_expression_data_mut(&mut self, definition_id: &DefinitionId, monomorph_idx: i32) -> &mut ProcedureMonomorph {
 
        debug_assert!(monomorph_idx >= 0);
 
        let def = self.lookup.get_mut(definition_id).unwrap();
 
        let monomorphs = def.definition.procedure_monomorphs_mut();
 
        return &mut monomorphs[monomorph_idx as usize];
 
    }
 

	
 
    pub(crate) fn get_procedure_expression_data(&self, definition_id: &DefinitionId, monomorph_idx: i32) -> &ProcedureMonomorph {
 
        debug_assert!(monomorph_idx >= 0);
 
        let def = self.lookup.get(definition_id).unwrap();
 
        let monomorphs = def.definition.procedure_monomorphs();
 
        return &monomorphs[monomorph_idx as usize];
 
    }
 

	
 
    /// Reserves space for a monomorph of a polymorphic procedure. The index
 
    /// will point into a (reserved) slot of the array of expression types. The
 
    /// monomorph may NOT exist yet (because the reservation implies that we're
 
    /// going to be performing typechecking on it, and we don't want to
 
    /// check the same monomorph twice)
 
    pub(crate) fn reserve_procedure_monomorph_index(&mut self, definition_id: &DefinitionId, concrete_type: ConcreteType) -> i32 {
 
        let def = self.lookup.get_mut(definition_id).unwrap();
 
        let mono_types = def.definition.procedure_monomorphs_mut();
 
        debug_assert!(def.is_polymorph == (concrete_type.parts.len() != 1));
 
        debug_assert!(!mono_types.iter().any(|v| v.concrete_type == concrete_type));
 

	
 
        let mono_idx = mono_types.len();
 
        mono_types.push(ProcedureMonomorph{
 
            concrete_type,
 
            arg_types: Vec::new(),
 
            expr_data: Vec::new(),
 
        });
 

	
 
        return mono_idx as i32;
 
    }
 

	
 
    /// Adds a datatype polymorph to the type table. Will not add the
 
    /// monomorph if it is already present, or if the type's polymorphic
 
    /// variables are all unused.
 
    /// TODO: Fix signature
 
    pub(crate) fn add_data_monomorph(
 
        &mut self, modules: &[Module], heap: &Heap, arch: &TargetArch, definition_id: DefinitionId, concrete_type: ConcreteType
 
    ) -> Result<i32, ParseError> {
 
        debug_assert_eq!(definition_id, get_concrete_type_definition(&concrete_type));
 

	
 
        // Check if the monomorph already exists
 
        let poly_type = self.lookup.get_mut(&definition_id).unwrap();
 
        if let Some(idx) = poly_type.get_monomorph_index(&concrete_type) {
 
            return Ok(idx as i32);
 
        }
 

	
 
        // Doesn't exist, so instantiate a monomorph and determine its memory
 
        // layout.
 
        self.detect_and_resolve_type_loops_for(modules, heap, concrete_type)?;
 
        debug_assert_eq!(self.encountered_types[0].definition_id, definition_id);
 
        let mono_idx = self.encountered_types[0].monomorph_idx;
 
        self.lay_out_memory_for_encountered_types(arch);
 

	
 
        return Ok(mono_idx as i32);
 
    }
 

	
 
    //--------------------------------------------------------------------------
 
    // Building base types
 
    //--------------------------------------------------------------------------
 

	
 
    /// Builds the base type for an enum. Will not compute byte sizes
 
    fn build_base_enum_definition(&mut self, modules: &[Module], ctx: &mut PassCtx, definition_id: DefinitionId) -> Result<(), ParseError> {
 
        debug_assert!(!self.lookup.contains_key(&definition_id), "base enum already built");
 
        let definition = ctx.heap[definition_id].as_enum();
 
        let root_id = definition.defined_in;
 

	
 
        // Determine enum variants
 
        let mut enum_value = -1;
 
        let mut variants = Vec::with_capacity(definition.variants.len());
 

	
 
        for variant in &definition.variants {
 
            if enum_value == i64::MAX {
 
                let source = &modules[definition.defined_in.index as usize].source;
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{Inbox, OutboxMessage};
 
use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, OutboxMessage>, // TODO: @temporary, remove together with fires()
 
    inbox: HashMap<PortIdLocal, OutgoingMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    fn new_initial_branch(component_state: ComponentState) -> Self {
 
    pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            inbox: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
pub(crate) struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    pub owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
    pub port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
@@ -201,159 +202,160 @@ impl ConnectorPorts {
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
impl BranchQueue {
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads. Note that this is not enforced by the compiler. The global store
 
/// allows retrieving the entire `Connector` as a mutable reference by one
 
/// thread, and this `ConnectorPublic` by any number of threads.
 
/// threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: Inbox,
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
        ConnectorPublic{
 
            inbox: Inbox::new(),
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(false),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct Connector {
 
    // State and properties of connector itself
 
    id: u32,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
    pub public: ConnectorPublic,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            public: ConnectorPublic::new(),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
@@ -426,97 +428,97 @@ impl Connector {
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and modify them to be
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
                }
 
                let local_port_index = local_port_index.unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutboxMessage {
 
                    let message = OutgoingMessage {
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
@@ -664,113 +666,113 @@ impl Connector {
 

	
 
                    debug_assert!(to_delete_index != -1);
 
                    branch.ports_delta.remove(to_delete_index as usize);
 
                }
 
            }
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
                    } else {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
                        branch.ports_delta.remove(delta_idx);
 
                        continue 'port_loop;
 
                    }
 
                }
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<OutboxMessage>,
 
    pub outbox: Vec<OutgoingMessage>,
 
    pub new_connectors: Vec<Connector>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/global_store.rs
Show inline comments
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{Connector, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use std::ptr;
 
use std::sync::{RwLock, RwLockReadGuard};
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 

	
 
/// A kind of token that, once obtained, allows access to a container.
 
struct ConnectorKey {
 
    index: u32, // of connector
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
pub(crate) struct ConnectorId(u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: Connector,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
struct ConnectorStore {
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut Connector>,
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        return Self{
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        };
 
    }
 

	
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic {
 
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id as usize);
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
        }
 
    }
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
        }
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, connector: Connector) -> ConnectorKey {
 
        let lock = self.inner.write().unwrap();
 
        let connector = ScheduledConnector{
 
            connector,
 
            public: ConnectorPublic::new(),
 
            router: Router::new(),
 
        };
 

	
 
        let index;
 
        if lock.free.is_empty() {
 
            let connector = Box::into_raw(Box::new(connector));
 

	
 
            unsafe {
 
                // Cheating a bit here. Anyway, move to heap, store in list
 
                index = lock.connectors.len();
 
                lock.connectors.push(connector);
 
            }
 
        } else {
 
            index = lock.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = lock.connectors.get_mut(index);
 
                debug_assert!(!target.is_null());
 
                ptr::write(*target, connector);
 
            }
 
        }
 

	
 
        return ConnectorKey{ index: index as u32 };
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        for idx in 0..lock.connectors.len() {
 
            unsafe {
 
                let memory = *lock.connectors.get_mut(idx);
 
                let _ = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
 
    }
 
}
 

	
 
/// The registry of all ports
 
pub struct PortStore {
 
    inner: RwLock<PortStoreInner>,
 
}
 

	
 
struct PortStoreInner {
 
    ports: RawVec<Port>,
 
    free: Vec<usize>,
 
}
 

	
 
impl PortStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
            inner: RwLock::new(PortStoreInner{
 
                ports: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        }
 
    }
 

	
 
    pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef {
 
        let lock = self.inner.read().unwrap();
 
        debug_assert!(port_id.is_valid());
 

	
 
        unsafe {
 
            let port = lock.ports.get_mut(port_id.index as usize);
 
            let port = &mut *port;
 
            debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing
 

	
 
            return PortRef{ lock, port };
 
        }
 
    }
 

	
 
    pub(crate) fn create_channel(&self, creating_connector: Option<u32>) -> Channel {
 
    pub(crate) fn create_channel(&self, creating_connector: Option<ConnectorId>) -> Channel {
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        // Reserves a new port. Doesn't point it to its counterpart
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<u32>) -> u32 {
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<ConnectorId>) -> u32 {
 
            let index;
 
            let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned };
 
            let connector_id = creating_connector.unwrap_or(0);
 
            let (ownership, connector_id) = if creating_connector.is_some() {
 
                (PortOwnership::Owned, creating_connector.unwrap())
 
            } else {
 
                (PortOwnership::Unowned, ConnectorId::new_invalid())
 
            };
 

	
 
            if lock.free.is_empty() {
 
                index = lock.ports.len() as u32;
 
                lock.ports.push(Port{
 
                    self_id: PortIdLocal::new(index),
 
                    peer_id: PortIdLocal::new_invalid(),
 
                    kind,
 
                    ownership,
 
                    owning_connector: connector_id,
 
                    peer_connector: connector_id
 
                });
 
            } else {
 
                index = lock.free.pop().unwrap() as u32;
 
                let port = unsafe{ &mut *lock.ports.get_mut(index as usize) };
 

	
 
                port.peer_id = PortIdLocal::new_invalid();
 
                port.kind = kind;
 
                port.ownership = ownership;
 
                port.owning_connector = connector_id;
 
                port.peer_connector = connector_id;
 
            }
 

	
 
            return index;
 
        }
 

	
 
        // Create the ports
 
        let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector);
 
        let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector);
 
        debug_assert_ne!(putter_id, getter_id);
 

	
 
        // Point them to one another
 
        unsafe {
 
            let putter_port = &mut *lock.ports.get_mut(putter_id as usize);
 
            let getter_port = &mut *lock.ports.get_mut(getter_id as usize);
 
            putter_port.peer_id = getter_port.self_id;
 
            getter_port.peer_id = putter_port.self_id;
 
        }
 

	
 
        return Channel{ putter_id, getter_id }
 
    }
 
}
 

	
 
pub struct PortRef<'p> {
 
    lock: RwLockReadGuard<'p, PortStoreInner>,
 
    port: &'static mut Port,
 
}
 

	
 
impl<'p> std::ops::Deref for PortRef<'p> {
 
    type Target = Port;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
impl<'p> std::ops::DerefMut for PortRef<'p> {
 
    fn deref_mut(&mut self) -> &mut Self::Target {
 
        return self.port;
 
    }
 
}
 

	
 
impl Drop for PortStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        // Very lazy code
 
        for idx in 0..lock.ports.len() {
 
            if lock.free.contains(&idx) {
 
                continue;
 
            }
 

	
 
            unsafe {
 
                let port = lock.ports.get_mut(idx);
 
                std::ptr::drop_in_place(port);
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
///     This includes the `should_exit` and `did_exit` pair!
 
pub struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub ports: PortStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            ports: PortStore::with_capacity(256),
 
            should_exit: AtomicBool::new(false),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox.rs
Show inline comments
 
/**
 
inbox.rs
 

	
 
Contains various types of inboxes and message types for the connectors. There
 
are two kinds of inboxes:
 

	
 
The `PublicInbox` is a simple message queue. Messages are put in by various
 
threads, and they're taken out by a single thread. These messages may contain
 
control messages and may be filtered or redirected by the scheduler.
 

	
 
The `PrivateInbox` is a temporary storage for all messages that are received
 
within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::connector::{BranchId, PortIdLocal};
 
use super::connector::{BranchId, PortIdLocal};
 
use super::global_store::ConnectorId;
 

	
 
/// A message prepared by a connector. Waiting to be picked up by the runtime to
 
/// be sent to another connector.
 
#[derive(Clone)]
 
pub struct OutboxMessage {
 
pub struct OutgoingMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message inserted into the inbox of a connector by the runtime.
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
pub struct InboxMessage {
 
pub struct DataMessage {
 
    pub sending_connector: ConnectorId,
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message sent between connectors to communicate something about their
 
/// scheduling state.
 
pub enum ControlMessage {
 
    ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
 
    Ack(u32), // (control message ID)
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sender: ConnectorId,
 
    pub content: ControlMessageVariant,
 
}
 

	
 
/// The inbox of a connector. The owning connector (i.e. the thread that is
 
/// executing the connector) should be able to read all messages. Other
 
/// connectors (potentially executed by different threads) should be able to
 
/// append messages.
 
///
 
/// If a connector has no more code to run, and its inbox does not contain any
 
/// new messages, then it may go into sleep mode.
 
///
 
// TODO: @Optimize, this is a temporary lazy implementation
 
pub struct Inbox {
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
 
/// out and handles all control message and potential routing). The correctly
 
/// addressed `Data` variants will end up at the connector.
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: RwLock<Vec<InboxMessage>>,
 
    len_read: AtomicUsize,
 
    // System messages. These are handled by the scheduler and only need to be
 
    // handled once.
 
    system_messages: Mutex<VecDeque<ControlMessage>>,
 
    messages: Vec<DataMessage>,
 
    len_read: usize,
 
}
 

	
 
impl Inbox {
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: RwLock::new(Vec::new()),
 
            len_read: AtomicUsize::new(0),
 
            system_messages: Mutex::new(VecDeque::new()),
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&self, message: InboxMessage) {
 
        let mut messages = self.messages.write().unwrap();
 
        for existing in messages.iter() {
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 
        messages.push(message);
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        let lock = self.messages.read().unwrap();
 
        return InboxMessageIter{
 
            lock,
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read.load(Ordering::Acquire),
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&self) -> Option<InboxMessageRef> {
 
        let lock = self.messages.read().unwrap();
 
        let cur_index = self.len_read.load(Ordering::Acquire);
 
        if cur_index >= lock.len() {
 
    pub fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        // TODO: Accept the correctness and simply make it an add, or even
 
        //  remove the atomic altogether.
 
        if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
 
            panic!("multiple readers modifying number of messages read");
 
        }
 

	
 
        return Some(InboxMessageRef{
 
            lock,
 
            index: cur_index,
 
        });
 
        let to_return = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    pub fn insert_control_message(&self, message: ControlMessage) {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_control_message(&self) -> Option<ControlMessage> {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 
}
 

	
 
/// Reference to a new message
 
pub struct InboxMessageRef<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    index: usize,
 
}
 

	
 
impl<'i> std::ops::Deref for InboxMessageRef<'i> {
 
    type Target = InboxMessage;
 

	
 
    fn deref(&self) -> &'i Self::Target {
 
        return &self.lock[self.index];
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    messages: &'i Vec<DataMessage>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m InboxMessage;
 
    type Item = &'m DataMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.lock[self.next_index];
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.lock[self.next_index];
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod port;
 
mod global_store;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 
mod inbox;
 

	
 
// Imports
 

	
 
use std::sync::Arc;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 

	
 
use global_store::GlobalStore;
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};
 

	
 

	
 
// Runtime API
 
pub struct Runtime {
 
    global_store: Arc<GlobalStore>,
 
    protocol_description: Arc<ProtocolDescription>,
 
    schedulers: Vec<JoinHandle<()>>
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let global_store = Arc::new(GlobalStore::new());
 

	
 
        // Launch threads
 
        let mut schedulers = Vec::with_capacity(num_threads);
 
        for _ in 0..num_threads {
 
            let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
 
            let thread = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            schedulers.push(thread);
 
        }
 

	
 
        // Move innards into runtime struct
 
        return Runtime{
 
            global_store,
 
            protocol_description,
 
            schedulers,
 
        }
 
    }
 

	
 
    /// Returns (putter port, getter port)
 
    pub fn create_channel(&self) -> (Value, Value) {
 
        let channel = self.global_store.ports.create_channel(None);
 
        let putter_value = Value::Output(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.putter_id,
 
        }));
 
        let getter_value = Value::Input(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.getter_id,
 
        }));
 
        return (putter_value, getter_value);
 
    }
 

	
 
    pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // TODO: Remove component creation function from PD, should not be concerned with it
 
        // Create the connector and mark the ports as now owned by the
 
        // connector
 
        let mut port_ids = Vec::new();
 
        find_ports_in_value_group(&values, &mut port_ids);
 

	
 
        let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
 
        let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
 
        let connector_key = self.global_store.connectors.create(connector);
 

	
 
        for port_id in port_ids {
 
            let port = self.global_store.ports.get(&connector_key, port_id);
 
            port.owning_connector = connector_key.downcast();
 
            port.peer_connector
 
            // TODO: Note that we immediately need to notify the other side of the connector that
 
            //  the port has moved!
 
        }
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 

	
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
pub enum PortOwnership {
 
    Unowned, // i.e. held by a native application
 
    Owned,
 
    InTransit,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    // Once created, these values are immutable
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    // But this can be changed, but only by the connector that owns it
 
    pub ownership: PortOwnership,
 
    pub owning_connector: u32,
 
    pub peer_connector: u32, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
    pub owning_connector: ConnectorId,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: u32, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: u32, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::Ordering;
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 

	
 
use super::inbox::InboxMessage;
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 
use super::port::{PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
struct Scheduler {
 
pub(crate) struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            global,
 
            code,
 
        }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.global.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                thread::sleep(Duration::new(1, 0));
 
                continue
 
                if self.global.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    break 'thread_loop;
 
                }
 

	
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.connectors.get_mut(&connector_key);
 
            let scheduled = self.global.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    match message {
 
                        Message::Data(message) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                            }
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = self.global.ports.get(&connector_key, port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                // TODO: Check inbox for new message
 
                                    // And respond with an Ack
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        message.sender,
 
                                        Message::Control(ControlMessage{
 
                                            id: message.id,
 
                                            sender: connector_key.downcast(),
 
                                            content: ControlMessageVariant::Ack,
 
                                        })
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(message.id);
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if connector.is_in_sync_mode() {
 
                // Actually run the connector
 
                let new_schedule;
 
                if scheduled.connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 
                }
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.global.connector_queue.push_back(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
 
                    scheduled.public.sleeping.store(true, Ordering::Release);
 

	
 
                    // We might have received a message in the meantime from a
 
                    // thread that did not see the sleeping flag set to `true`,
 
                    // so:
 
                    if !scheduled.public.inbox.is_empty() {
 
                        let should_reschedule_self = scheduled.public.sleeping
 
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.global.connector_queue.push_back(connector_key);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
 
        // Handling any messages that were sent
 
        if !delta_state.outbox.is_empty() {
 
                        // There are message to send
 
            for message in delta_state.outbox.drain(..) {
 
                let (inbox_message, target_connector_id) = {
 
                                // Note: retrieving a port incurs a read lock
 
                    let sending_port = self.global.ports.get(&connector_key, message.sending_port);
 
                    (
 
                                    InboxMessage {
 
                        DataMessage {
 
                            sending_connector: connector_key.downcast(),
 
                            sending_port: sending_port.self_id,
 
                            receiving_port: sending_port.peer_id,
 
                            sender_prev_branch_id: message.sender_prev_branch_id,
 
                            sender_cur_branch_id: message.sender_cur_branch_id,
 
                            message: message.message,
 
                        },
 
                        sending_port.peer_connector,
 
                    )
 
                };
 

	
 
                            let target_connector = self.global.connectors.get_shared(target_connector_id);
 
                            target_connector.inbox.insert_message(inbox_message);
 

	
 
                            // TODO: Check silent state. Queue connector if it was silent
 
                self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
 
            }
 
        }
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 
                        for connector in delta_state.new_connectors.drain(..) {
 
                            // Create connector, modify all of the ports that
 
                            // it now owns, then queue it for execution
 
                            let connector_key = self.global.connectors.create(connector);
 
            let cur_connector = self.global.connectors.get_mut(connector_key);
 

	
 
                            self.global.connector_queue.push_back(connector_key);
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.global.connectors.create(new_connector);
 
                let new_connector = self.global.connectors.get_mut(&new_key);
 

	
 
                // Each port should be lost by the connector that created the
 
                // new one. Note that the creator is the current owner.
 
                for port_id in &new_connector.ports.owned_ports {
 
                    debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));
 

	
 
                    // Modify ownership, retrieve peer connector
 
                    let (peer_connector_id, peer_port_id) = {
 
                        let mut port = self.global.ports.get(connector_key, *port_id);
 
                        port.owning_connector = new_key.downcast();
 

	
 
                        (port.peer_connector, port.peer_id)
 
                    };
 

	
 
                    // Send message that port has changed ownership
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.global.connector_queue.push_back(new_key);
 
            }
 
        }
 
    }
 

	
 
                cur_schedule = new_schedule;
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.global.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.global.connector_queue.push_back(key);
 
        }
 
    }
 
}
 

	
 
/// Represents a rerouting entry due to a moved port
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    port: PortIdLocal,              // targeted port
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
}
 

	
 
pub(crate) struct Router {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
}
 

	
 
impl Router {
 
    pub fn new() -> Self {
 
        Router{
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            sender: self_connector_id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == message.sending_connector &&
 
                reroute.port == message.sending_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 

	
 
        match index {
 
            Some(index) => { self.active.remove(index); },
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)