Changeset - eeea39f48a29
[Not reviewed]
Cargo.toml
Show inline comments
 
@@ -23,20 +23,20 @@ mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"]
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# randomness
 
rand = "0.8.4"
 
rand_pcg = "0.3.1"
 

	
 
[lib]
 
crate-type = [
 
	"rlib", # for use as a Rust dependency.
 
]
 
\ No newline at end of file
docs/runtime/sync.md
Show inline comments
 
new file 100644
 
# Synchronous Communication
 

	
 
## 
 
\ No newline at end of file
src/common.rs
Show inline comments
 
deleted file
src/lib.rs
Show inline comments
 
#[macro_use]
 
mod macros;
 

	
 
// mod common;
 
mod protocol;
 
pub mod runtime;
 
pub mod runtime2;
 
mod collections;
 
mod random;
 

	
 
pub use protocol::{ProtocolDescription, ProtocolDescriptionBuilder, ComponentCreationError};
 
\ No newline at end of file
src/protocol/eval/executor.rs
Show inline comments
 
@@ -727,43 +727,43 @@ impl Prompt {
 
                                    for element in elements {
 
                                        message.push(element.as_char());
 
                                    }
 

	
 
                                    // Drop the heap-allocated value from the
 
                                    // store
 
                                    self.store.drop_heap_pos(value_heap_pos);
 
                                    println!("{}", message);
 
                                },
 
                                Method::SelectStart => {
 
                                    let num_cases = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    let num_ports = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    if !ctx.select_start(num_cases, num_ports) {
 
                                        return Ok(EvalContinuation::SelectStart(num_cases, num_ports))
 
                                    }
 

	
 
                                    return Ok(EvalContinuation::SelectStart(num_cases, num_ports));
 
                                },
 
                                Method::SelectRegisterCasePort => {
 
                                    let case_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    let port_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    let port_value = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_port_id();
 

	
 
                                    if !ctx.performed_select_start() {
 
                                        return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value));
 
                                    }
 
                                    return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value));
 
                                },
 
                                Method::SelectWait => {
 
                                    match ctx.performed_select_wait() {
 
                                        Some(select_index) => {
 
                                            cur_frame.expr_values.push_back(Value::UInt32(select_index));
 
                                        },
 
                                        None => return Ok(EvalContinuation::SelectWait),
 
                                        None => {
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr.this.upcast()));
 
                                            return Ok(EvalContinuation::SelectWait)
 
                                        },
 
                                    }
 
                                },
 
                                Method::UserComponent => {
 
                                    // This is actually handled by the evaluation
 
                                    // of the statement.
 
                                    debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len());
 
                                    debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this)
 
                                },
 
                                Method::UserFunction => {
 
                                    // Push a new frame. Note that all expressions have
 
                                    // been pushed to the front, so they're in the order
 
                                    // of the definition.
src/protocol/mod.rs
Show inline comments
 
@@ -206,26 +206,24 @@ impl ProtocolDescription {
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
pub trait RunContext {
 
    fn performed_put(&mut self, port: PortId) -> bool;
 
    fn performed_get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
 
    fn fires(&mut self, port: PortId) -> Option<Value>; // None if not yet branched
 
    fn performed_fork(&mut self) -> Option<bool>; // None if not yet forked
 
    fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared
 
    fn performed_select_start(&mut self) -> bool; // true if performed
 
    fn performed_select_register_port(&mut self) -> bool; // true if registered
 
    fn performed_select_wait(&mut self) -> Option<u32>; // None if not yet notified runtime of select blocker
 
}
 

	
 
pub struct ProtocolDescriptionBuilder {
 
    parser: Parser,
 
}
 

	
 
impl ProtocolDescriptionBuilder {
 
    pub fn new() -> Self {
 
        return Self{
 
            parser: Parser::new(),
 
        }
src/protocol/parser/pass_rewriting.rs
Show inline comments
 
@@ -106,43 +106,43 @@ impl Visitor for PassRewriting {
 
    }
 

	
 
    // --- Visiting the select statement
 

	
 
    fn visit_select_stmt(&mut self, ctx: &mut Ctx, id: SelectStatementId) -> VisitorResult {
 
        // Utility for the last stage of rewriting process. Note that caller
 
        // still needs to point the end of the if-statement to the end of the
 
        // replacement statement of the select statement.
 
        fn transform_select_case_code(
 
            ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
            select_id: SelectStatementId, case_index: usize,
 
            select_var_id: VariableId, select_var_type_id: TypeIdReference
 
        ) -> (IfStatementId, EndIfStatementId) {
 
        ) -> (IfStatementId, EndIfStatementId, ScopeId) {
 
            // Retrieve statement IDs associated with case
 
            let case = &ctx.heap[select_id].cases[case_index];
 
            let case_guard_id = case.guard;
 
            let case_body_id = case.body;
 
            let case_scope_id = case.scope;
 

	
 
            // Create the if-statement for the result of the select statement
 
            let compare_expr_id = create_ast_equality_comparison_expr(ctx, containing_procedure_id, select_var_id, select_var_type_id, case_index as u64);
 
            let true_case = IfStatementCase{
 
                body: case_guard_id, // which is linked up to the body
 
                scope: case_scope_id,
 
            };
 
            let (if_stmt_id, end_if_stmt_id) = create_ast_if_stmt(ctx, compare_expr_id.upcast(), true_case, None);
 

	
 
            // Link up body statement to end-if
 
            set_ast_statement_next(ctx, case_body_id, end_if_stmt_id.upcast());
 

	
 
            return (if_stmt_id, end_if_stmt_id)
 
            return (if_stmt_id, end_if_stmt_id, case_scope_id);
 
        }
 

	
 
        // Precreate the block that will end up containing all of the
 
        // transformed statements. Also precreate the scope associated with it
 
        let (outer_block_id, outer_end_block_id, outer_scope_id) =
 
            create_ast_block_stmt(ctx, Vec::new());
 

	
 
        // The "select" and the "end select" statement will act like trampolines
 
        // that jump to the replacement block. So set the child/parent
 
        // relationship already.
 
        // --- for the statements
 
        let select_stmt = &mut ctx.heap[id];
 
@@ -197,50 +197,50 @@ impl Visitor for PassRewriting {
 
            let call_expr = &mut ctx.heap[get_call_expr_id];
 
            call_expr.arguments[0] = variable_expr_id.upcast();
 

	
 
            transformed_stmts.push(variable_decl_stmt_id.upcast().upcast());
 
            locals.push((variable_id, port_type_ref));
 
        }
 

	
 
        // Insert runtime calls that facilitate the semantics of the select
 
        // block.
 

	
 
        // Create the call that indicates the start of the select block
 
        {
 
            let num_cases_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_cases as u64);
 
            let num_ports_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_ports as u64);
 
            let num_cases_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_cases as u64, ctx.arch.uint32_type_id);
 
            let num_ports_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_ports as u64, ctx.arch.uint32_type_id);
 
            let arguments = vec![
 
                num_cases_expression_id.upcast(),
 
                num_ports_expression_id.upcast()
 
            ];
 

	
 
            let call_expression_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_statement_id = create_ast_expression_stmt(ctx, call_expression_id.upcast());
 

	
 
            transformed_stmts.push(call_statement_id.upcast());
 
        }
 

	
 
        // Create calls for each select case that will register the ports that
 
        // we are waiting on at the runtime.
 
        {
 
            let mut total_port_index = 0;
 
            for case_index in 0..total_num_cases {
 
                let case = &ctx.heap[id].cases[case_index];
 
                let case_num_ports = case.involved_ports.len();
 

	
 
                for case_port_index in 0..case_num_ports {
 
                    // Arguments to runtime call
 
                    let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64);
 
                    let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64);
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id);
 
                    let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id);
 
                    let port_variable_expr_id = create_ast_variable_expr(ctx, self.current_procedure_id, port_variable_id, port_variable_type);
 
                    let runtime_call_arguments = vec![
 
                        case_index_expr_id.upcast(),
 
                        port_index_expr_id.upcast(),
 
                        port_variable_expr_id.upcast()
 
                    ];
 

	
 
                    // Create runtime call, then store it
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_stmt_id = create_ast_expression_stmt(ctx, runtime_call_expr_id.upcast());
 

	
 
                    transformed_stmts.push(runtime_call_stmt_id.upcast());
 
@@ -258,48 +258,56 @@ impl Visitor for PassRewriting {
 

	
 
        {
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let variable_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, select_variable_id, select_variable_type, runtime_call_expr_id.upcast());
 
            transformed_stmts.push(variable_stmt_id.upcast().upcast());
 
        }
 

	
 
        call_id_section.forget();
 
        expr_id_section.forget();
 

	
 
        // Now we transform each of the select block case's guard and code into
 
        // a chained if-else statement.
 
        let mut relative_pos = transformed_stmts.len() as i32;
 
        if total_num_cases > 0 {
 
            let (if_stmt_id, end_if_stmt_id) = transform_select_case_code(ctx, self.current_procedure_id, id, 0, select_variable_id, select_variable_type);
 
            let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, 0, select_variable_id, select_variable_type);
 
            link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, outer_scope_id, scope_id, relative_pos);
 
            let first_end_if_stmt = &mut ctx.heap[end_if_stmt_id];
 
            first_end_if_stmt.next = outer_end_block_id.upcast();
 

	
 
            let mut last_if_stmt_id = if_stmt_id;
 
            let mut last_end_if_stmt_id = end_if_stmt_id;
 
            let mut last_parent_scope_id = outer_scope_id;
 
            let mut last_relative_pos = transformed_stmts.len() as i32 + 1;
 
            transformed_stmts.push(last_if_stmt_id.upcast());
 

	
 
            for case_index in 1..total_num_cases {
 
                let (if_stmt_id, end_if_stmt_id) = transform_select_case_code(ctx, self.current_procedure_id, id, case_index, select_variable_id, select_variable_type);
 
                let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, case_index, select_variable_id, select_variable_type);
 
                let false_case_scope_id = ctx.heap.alloc_scope(|this| Scope::new(this, ScopeAssociation::If(last_if_stmt_id, false)));
 
                link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, false_case_scope_id, scope_id, 0);
 
                link_new_child_to_existing_parent_scope(ctx, &mut self.scope_buffer, last_parent_scope_id, false_case_scope_id, last_relative_pos);
 
                set_ast_if_statement_false_body(ctx, last_if_stmt_id, last_end_if_stmt_id, IfStatementCase{ body: if_stmt_id.upcast(), scope: false_case_scope_id });
 

	
 
                let end_if_stmt = &mut ctx.heap[end_if_stmt_id];
 
                end_if_stmt.next = last_end_if_stmt_id.upcast();
 

	
 
                last_if_stmt_id = if_stmt_id;
 
                last_end_if_stmt_id = end_if_stmt_id;
 
                last_parent_scope_id = false_case_scope_id;
 
                last_relative_pos = 0;
 
            }
 
        }
 

	
 
        // Final steps: set the statements of the replacement block statement,
 
        // and link all of those statements together
 
        // link all of those statements together, and update the scopes.
 
        let first_stmt_id = transformed_stmts[0];
 
        let mut last_stmt_id = transformed_stmts[0];
 
        for stmt_id in transformed_stmts.iter().skip(1).copied() {
 
            set_ast_statement_next(ctx, last_stmt_id, stmt_id);
 
            last_stmt_id = stmt_id;
 
        }
 

	
 
        let outer_block_stmt = &mut ctx.heap[outer_block_id];
 
        outer_block_stmt.next = first_stmt_id;
 
        outer_block_stmt.statements = transformed_stmts;
 

	
 
        return Ok(())
 
@@ -371,44 +379,44 @@ fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinit
 
        type_index: call_type_index,
 
    });
 

	
 
    for argument_index in 0..expression_ids.len() {
 
        let argument_id = expression_ids[argument_index];
 
        let argument_expr = &mut ctx.heap[argument_id];
 
        *argument_expr.parent_mut() = ExpressionParent::Expression(call_expression_id.upcast(), argument_index as u32);
 
    }
 

	
 
    return call_expression_id;
 
}
 

	
 
fn create_ast_literal_integer_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, unsigned_value: u64) -> LiteralExpressionId {
 
    let literal_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.uint64_type_id));
 
fn create_ast_literal_integer_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, unsigned_value: u64, type_id: TypeId) -> LiteralExpressionId {
 
    let literal_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(type_id));
 
    return ctx.heap.alloc_literal_expression(|this| LiteralExpression{
 
        this,
 
        span: InputSpan::new(),
 
        value: Literal::Integer(LiteralInteger{
 
            unsigned_value,
 
            negated: false,
 
        }),
 
        parent: ExpressionParent::None,
 
        type_index: literal_type_index,
 
    });
 
}
 

	
 
fn create_ast_equality_comparison_expr(
 
    ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
    variable_id: VariableId, variable_type: TypeIdReference, value: u64
 
) -> BinaryExpressionId {
 
    let var_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type);
 
    let int_expr_id = create_ast_literal_integer_expr(ctx, containing_procedure_id, value);
 
    let int_expr_id = create_ast_literal_integer_expr(ctx, containing_procedure_id, value, ctx.arch.uint32_type_id);
 
    let cmp_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.bool_type_id));
 
    let cmp_expr_id = ctx.heap.alloc_binary_expression(|this| BinaryExpression{
 
        this,
 
        operator_span: InputSpan::new(),
 
        full_span: InputSpan::new(),
 
        left: var_expr_id.upcast(),
 
        operation: BinaryOperator::Equality,
 
        right: int_expr_id.upcast(),
 
        parent: ExpressionParent::None,
 
        type_index: cmp_type_index,
 
    });
 

	
 
@@ -431,33 +439,34 @@ fn create_ast_expression_stmt(ctx: &mut Ctx, expression_id: ExpressionId) -> Exp
 
    let expression = &mut ctx.heap[expression_id];
 
    *expression.parent_mut() = ExpressionParent::ExpressionStmt(statement_id);
 

	
 
    return statement_id;
 
}
 

	
 
fn create_ast_variable_declaration_stmt(
 
    ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
    variable_id: VariableId, variable_type: TypeIdReference, initial_value_expr_id: ExpressionId
 
) -> MemoryStatementId {
 
    // Create the assignment expression, assigning the initial value to the variable
 
    let variable_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type);
 
    let void_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.void_type_id));
 
    let assignment_expr_id = ctx.heap.alloc_assignment_expression(|this| AssignmentExpression{
 
        this,
 
        operator_span: InputSpan::new(),
 
        full_span: InputSpan::new(),
 
        left: variable_expr_id.upcast(),
 
        operation: AssignmentOperator::Set,
 
        right: initial_value_expr_id,
 
        parent: ExpressionParent::None,
 
        type_index: -1,
 
        type_index: void_type_index,
 
    });
 

	
 
    // Create the memory statement
 
    let memory_stmt_id = ctx.heap.alloc_memory_statement(|this| MemoryStatement{
 
        this,
 
        span: InputSpan::new(),
 
        variable: variable_id,
 
        initial_expr: assignment_expr_id,
 
        next: StatementId::new_invalid(),
 
    });
 

	
 
    // Set all parents which we can access
 
@@ -614,27 +623,27 @@ fn link_existing_child_to_new_parent_scope(ctx: &mut Ctx, scope_buffer: &mut Sco
 
    let old_parent = &mut ctx.heap[old_parent_scope_id];
 
    let scope_index = old_parent.nested.iter()
 
        .position(|v| *v == child_scope_id)
 
        .unwrap();
 
    old_parent.nested.remove(scope_index);
 

	
 
    // Add to new parent
 
    add_child_scope_to_parent(ctx, scope_buffer, new_parent_scope_id, child_scope_id, new_relative_pos_in_parent);
 
}
 

	
 
/// Will add a child scope to a parent scope using the relative position hint.
 
fn add_child_scope_to_parent(ctx: &mut Ctx, scope_buffer: &mut ScopedBuffer<ScopeId>, parent_scope_id: ScopeId, child_scope_id: ScopeId, relative_pos_hint: i32) {
 
    let child_scope = &ctx.heap[child_scope_id];
 
    let parent_scope = &ctx.heap[parent_scope_id];
 

	
 
    let existing_scope_ids = scope_buffer.start_section_initialized(&child_scope.nested);
 
    let existing_scope_ids = scope_buffer.start_section_initialized(&parent_scope.nested);
 
    let mut insert_pos = existing_scope_ids.len();
 
    for index in 0..existing_scope_ids.len() {
 
        let existing_scope_id = existing_scope_ids[index];
 
        let existing_scope = &ctx.heap[existing_scope_id];
 
        if relative_pos_hint <= existing_scope.relative_pos_in_parent {
 
            insert_pos = index;
 
            break;
 
        }
 
    }
 
    existing_scope_ids.forget();
 

	
 
    let parent_scope = &mut ctx.heap[parent_scope_id];
 
@@ -656,13 +665,13 @@ fn add_new_procedure_expression_type(ctx: &mut Ctx, procedure_id: ProcedureDefin
 
            }
 
        },
 
        TypeIdReference::IndirectSameAsExpr(source_type_index) => {
 
            for monomorph in procedure.monomorphs.iter_mut() {
 
                debug_assert_eq!(monomorph.expr_info.len(), type_index);
 
                let copied_expr_info = monomorph.expr_info[source_type_index as usize];
 
                monomorph.expr_info.push(copied_expr_info)
 
            }
 
        }
 
    }
 

	
 
    return type_index as i32;
 
}
 
}
 
\ No newline at end of file
src/protocol/parser/pass_typing.rs
Show inline comments
 
@@ -2100,29 +2100,24 @@ impl PassTyping {
 
        }
 

	
 
        // Write the types of the arguments
 
        let procedure = &ctx.heap[self.procedure_id];
 
        for parameter_id in procedure.parameters.iter().copied() {
 
            let mut concrete = ConcreteType::default();
 
            let var_data = self.var_data.iter().find(|v| v.var_id == parameter_id).unwrap();
 
            var_data.var_type.write_concrete_type(&mut concrete);
 
            let type_id = ctx.types.add_monomorphed_type(ctx.modules, ctx.heap, ctx.arch, concrete)?;
 
            monomorph.argument_types.push(type_id)
 
        }
 

	
 
        println!("DEBUG: For procedure {} with polyargs {:#?}", ctx.heap[self.procedure_id].identifier.value.as_str(), self.poly_vars);
 
        for infer_node in self.infer_nodes.iter() {
 
            println!("DEBUG: [{:?}] has type: {}", infer_node.expr_id, infer_node.expr_type.display_name(&ctx.heap));
 
        }
 

	
 
        // Determine if we have already assigned type indices to the expressions
 
        // before (the indices that, for a monomorph, can retrieve the type of
 
        // the expression).
 
        let has_type_indices = self.reserved_monomorph_index > 0;
 
        if has_type_indices {
 
            // already have indices, so resize and then index into it
 
            debug_assert!(monomorph.expr_info.is_empty());
 
            monomorph.expr_info.resize(num_infer_nodes, ExpressionInfo::new_invalid());
 
            for infer_node in self.infer_nodes.iter() {
 
                let type_index = ctx.heap[infer_node.expr_id].type_index();
 
                monomorph.expr_info[type_index as usize] = infer_node.as_expression_info();
 
            }
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -460,26 +460,26 @@ impl Hash for MonoSearchKey {
 
            // }
 
        }
 
    }
 
}
 

	
 
impl PartialEq for MonoSearchKey {
 
    fn eq(&self, other: &Self) -> bool {
 
        let mut self_index = 0;
 
        let mut other_index = 0;
 

	
 
        while self_index < self.parts.len() && other_index < other.parts.len() {
 
            // Retrieve part and flags
 
            let (self_bits, _) = self.parts[self_index];
 
            let (other_bits, _) = other.parts[other_index];
 
            let (_self_bits, _) = self.parts[self_index];
 
            let (_other_bits, _) = other.parts[other_index];
 
            let self_in_use = true; // (self_bits & Self::KEY_IN_USE) != 0; @Deduplication
 
            let other_in_use = true; // (other_bits & Self::KEY_IN_USE) != 0; @Deduplication
 

	
 
            // Determine ending indices
 
            let self_end_index = self.find_end_index(self_index);
 
            let other_end_index = other.find_end_index(other_index);
 

	
 
            if self_in_use == other_in_use {
 
                if self_in_use {
 
                    // Both are in use, so both parts should be equal
 
                    let delta_self = self_end_index - self_index;
 
                    let delta_other = other_end_index - other_index;
 
@@ -1514,33 +1514,35 @@ impl TypeTable {
 
                    CTP::SInt32 => arch.sint32_type_id,
 
                    CTP::SInt64 => arch.sint64_type_id,
 
                    CTP::Character => arch.char_type_id,
 
                    CTP::String => arch.string_type_id,
 
                    CTP::Array => arch.array_type_id,
 
                    CTP::Slice => arch.slice_type_id,
 
                    CTP::Input => arch.input_type_id,
 
                    CTP::Output => arch.output_type_id,
 
                    CTP::Pointer => arch.pointer_type_id,
 
                    _ => unreachable!(),
 
                };
 
                let base_type = &self.mono_types[base_type_id.0 as usize];
 
                let base_type_size = base_type.size;
 
                let base_type_alignment = base_type.alignment;
 

	
 
                let type_id = TypeId(self.mono_types.len() as i64);
 
                Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                self.mono_types.push(MonoType{
 
                    type_id,
 
                    concrete_type,
 
                    size: base_type.size,
 
                    alignment: base_type.alignment,
 
                    size: base_type_size,
 
                    alignment: base_type_alignment,
 
                    variant: MonoTypeVariant::Builtin
 
                });
 

	
 
                type_id
 
            },
 
            // User-defined types
 
            CTP::Tuple(num_embedded) => {
 
                debug_assert!(definition_id.is_invalid()); // because tuples do not have an associated `DefinitionId`
 
                let mut members = Vec::with_capacity(*num_embedded as usize);
 
                for section in ConcreteTypeIter::new(&concrete_type.parts, 0) {
 
                    members.push(TupleMonomorphMember{
 
                        type_id: TypeId::new_invalid(),
src/protocol/tests/utils.rs
Show inline comments
 
@@ -1263,16 +1263,14 @@ fn seek_expr_in_stmt<F: Fn(&Expression) -> bool>(heap: &Heap, start: StatementId
 
        },
 
        _ => None
 
    }
 
}
 

	
 
struct FakeRunContext{}
 
impl RunContext for FakeRunContext {
 
    fn performed_put(&mut self, _port: PortId) -> bool { unreachable!() }
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> { unreachable!() }
 
    fn fires(&mut self, _port: PortId) -> Option<Value> { unreachable!() }
 
    fn performed_fork(&mut self) -> Option<bool> { unreachable!() }
 
    fn created_channel(&mut self) -> Option<(Value, Value)> { unreachable!() }
 
    fn performed_select_start(&mut self) -> bool { unreachable!() }
 
    fn performed_select_register_port(&mut self) -> bool { unreachable!() }
 
    fn performed_select_wait(&mut self) -> Option<u32> { unreachable!() }
 
}
 
\ No newline at end of file
src/random.rs
Show inline comments
 
new file 100644
 
/**
 
 * random.rs
 
 *
 
 * Simple wrapper over a random number generator. Put here so that we can have
 
 * a feature flag for particular forms of randomness. For now we'll use pseudo-
 
 * randomness since that will help debugging.
 
 */
 

	
 
use rand::{RngCore, SeedableRng};
 
use rand_pcg;
 

	
 
pub(crate) struct Random {
 
    rng: rand_pcg::Lcg64Xsh32,
 
}
 

	
 
impl Random {
 
    pub(crate) fn new() -> Self {
 
        use std::time::SystemTime;
 

	
 
        let now = SystemTime::now();
 
        let elapsed = match now.duration_since(SystemTime::UNIX_EPOCH) {
 
            Ok(elapsed) => elapsed,
 
            Err(err) => err.duration(),
 
        };
 

	
 
        let elapsed = elapsed.as_nanos();
 
        let seed = elapsed.to_le_bytes();
 

	
 
        return Self::new_seeded(seed);
 
    }
 

	
 
    pub(crate) fn new_seeded(seed: [u8; 16]) -> Self {
 
        return Self{ rng: rand_pcg::Pcg32::from_seed(seed) }
 
    }
 

	
 
    pub(crate) fn get_u64(&mut self) -> u64 {
 
        return self.rng.next_u64();
 
    }
 
}
 
\ No newline at end of file
src/runtime/connector.rs
Show inline comments
 
@@ -114,26 +114,24 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 

	
 
    fn performed_select_start(&mut self) -> bool { unreachable!() }
 
    fn performed_select_register_port(&mut self) -> bool { unreachable!() }
 
    fn performed_select_wait(&mut self) -> Option<u32> { unreachable!() }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -19,25 +19,25 @@ pub struct Peer {
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone)]
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::random::Random;
 
use crate::protocol::*;
 
use crate::protocol::ast::ProcedureDefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_context::*;
 
@@ -15,26 +16,24 @@ use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    PerformedSelectStart,
 
    PerformedSelectRegister,
 
    PerformedSelectWait(u32),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
@@ -73,90 +72,194 @@ impl RunContext for ExecCtx {
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_select_start(&mut self) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedSelectStart => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_select_register_port(&mut self) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedSelectRegister => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_select_wait(&mut self) -> Option<u32> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case),
 
            _ => unreachable!(),
 
            _v => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
struct SelectCase {
 
    involved_ports: Vec<LocalPortHandle>,
 
}
 

	
 
// TODO: @Optimize, flatten cases into single array, have index-pointers to next case
 
struct SelectState {
 
    cases: Vec<SelectCase>,
 
    next_case: u32,
 
    num_cases: u32,
 
    random: Random,
 
    candidates_workspace: Vec<usize>,
 
}
 

	
 
enum SelectDecision {
 
    None,
 
    Case(u32), // contains case index, should be passed along to PDL code
 
}
 

	
 
type InboxMain = Vec<Option<DataMessage>>;
 

	
 
impl SelectState {
 
    fn new() -> Self {
 
        return Self{
 
            cases: Vec::new(),
 
            next_case: 0,
 
            num_cases: 0,
 
            random: Random::new(),
 
            candidates_workspace: Vec::new(),
 
        }
 
    }
 

	
 
    fn handle_select_start(&mut self, num_cases: u32) {
 
        self.cases.clear();
 
        self.next_case = 0;
 
        self.num_cases = num_cases;
 
    }
 

	
 
    /// Register a port as belonging to a particular case. As for correctness of
 
    /// PDL code one cannot register the same port twice, this function might
 
    /// return an error
 
    fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> {
 
        // Retrieve case and port handle
 
        self.ensure_at_case(case_index);
 
        let cur_case = &mut self.cases[case_index as usize];
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize);
 

	
 
        // Make sure port wasn't added before, we disallow having the same port
 
        // in the same select guard twice.
 
        if cur_case.involved_ports.contains(&port_handle) {
 
            return Err(port_id);
 
        }
 

	
 
        cur_case.involved_ports.push(port_handle);
 
        return Ok(());
 
    }
 

	
 
    /// Notification that all ports have been registered and we should now wait
 
    /// until the appropriate messages have come in.
 
    fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        if self.num_cases != self.next_case {
 
            // This happens when there are >=1 select cases written at the end
 
            // of the select block.
 
            self.ensure_at_case(self.num_cases - 1);
 
        }
 

	
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    /// Internal helper, pushes empty cases inbetween last case and provided new
 
    /// case index.
 
    fn ensure_at_case(&mut self, new_case_index: u32) {
 
        // Push an empty case for all intermediate cases that were not
 
        // registered with a port.
 
        debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases);
 
        for _ in self.next_case..new_case_index + 1 {
 
            self.cases.push(SelectCase{ involved_ports: Vec::new() });
 
        }
 
        self.next_case = new_case_index + 1;
 
    }
 

	
 
    /// Checks if a decision can be reached
 
    fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        self.candidates_workspace.clear();
 
        if self.cases.is_empty() {
 
            // If there are no cases then we can immediately reach a "bogus
 
            // decision".
 
            return SelectDecision::Case(0);
 
        }
 

	
 
        // Need to check for valid case
 
        'case_loop: for (case_index, case) in self.cases.iter().enumerate() {
 
            for port_handle in case.involved_ports.iter().copied() {
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if inbox[port_index].is_none() {
 
                    // Condition not satisfied
 
                    continue 'case_loop;
 
                }
 
            }
 

	
 
            // If here then the case guard is satisfied
 
            self.candidates_workspace.push(case_index);
 
        }
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    select: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            select: SelectState::new(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
@@ -186,25 +289,27 @@ impl CompPDL {
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync => {},
 
            Mode::NonSync | Mode::Sync | Mode::BlockedSelect => {
 
                // continue and run PDL code
 
            },
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
@@ -248,51 +353,66 @@ impl CompPDL {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state.is_blocked() {
 
                    self.mode = Mode::BlockedPut;
 
                    self.mode_port = port_id;
 
                    self.mode_value = value;
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
            },
 
            EC::SelectStart(num_cases, num_ports) => {
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                todo!("finish handling select start")
 
                self.select.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                todo!("finish handling register port")
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_select_wait(sched_ctx, comp_ctx);
 
                todo!("finish handling select wait")
 
                let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // No decision yet
 
                    self.mode = Mode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
@@ -363,31 +483,24 @@ impl CompPDL {
 
        };
 

	
 
        // If here then we've reached a decision
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    /// Handles the moment where the PDL code has notified the runtime of all
 
    /// the ports it is waiting on.
 
    fn handle_select_wait(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component waiting for select conclusion");
 

	
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::StartExit);
 
        self.mode = Mode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
@@ -427,24 +540,30 @@ impl CompPDL {
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            } else if self.mode == Mode::BlockedSelect {
 
                let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
 
                }
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked());
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -415,24 +415,37 @@ impl Consensus {
 
                return SyncRoundDecision::Failure;
 
            }
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id.0 > self.highest_id.0 {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.highest_id = header.highest_id;
 
            for peer in comp_ctx.iter_peers() {
 
                if peer.id == header.sending_id {
 
                    continue; // do not send to sender: it has the higher ID
 
                }
 

	
 
                // also: only send if we received a message in this round
 
                let mut performed_communication = false; // TODO: Revise, temporary fix
 
                for port in self.ports.iter() {
 
                    if port.peer_comp_id == peer.id && port.mapping.is_some() {
 
                        performed_communication = true;
 
                        break;
 
                    }
 
                }
 

	
 
                if !performed_communication {
 
                    continue;
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -80,55 +80,56 @@ fn test_component_communication() {
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    func infinite_assert<T>(T val, T expected) -> () {
 
        while (val != expected) { print(\"nope!\"); }
 
        return ();
 
    }
 

	
 
    primitive receiver(in<u32> in_a, in<u32> in_b, u32 num_sends) {
 
        auto num_from_a = 0;
 
        auto num_from_b = 0;
 
        while (num_from_a + num_from_b < 2 * num_sends) {
 
            sync select {
 
                auto v = get(in_a) -> {
 
                    print(\"got something from A\");
 
                    infinite_assert(v, num_from_a);
 
                    auto _ = infinite_assert(v, num_from_a);
 
                    num_from_a += 1;
 
                }
 
                auto v = get(in_b) -> {
 
                    print(\"got something from B\");
 
                    infinite_assert(v, num_from_b);
 
                    num_from_b +=1;
 
                    auto _ = infinite_assert(v, num_from_b);
 
                    num_from_b += 1;
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive sender(out<u32> tx, u32 num_sends) {
 
        auto index = 0;
 
        while (index < num_sends) {
 
            sync {
 
                put(tx, index);
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    composite constructor() {
 
        auto num_sends = 3;
 
        auto num_sends = 15;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
        new receiver(rx_a, rx_b, num_sends);
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)