Changeset - 77315308b8b8
[Not reviewed]
0 4 0
mh - 3 years ago 2022-03-02 11:03:16
contact@maxhenger.nl
WIP: Fixing bug with empty select statement
4 files changed with 46 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 

	
 
use std::collections::VecDeque;
 

	
 
use super::value::*;
 
use super::store::*;
 
use super::error::*;
 
use crate::protocol::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::type_table::*;
 

	
 
macro_rules! debug_enabled { () => { false }; }
 
macro_rules! debug_enabled { () => { true }; }
 
macro_rules! debug_log {
 
    ($format:literal) => {
 
        enabled_debug_print!(false, "exec", $format);
 
        enabled_debug_print!(true, "exec", $format);
 
    };
 
    ($format:literal, $($args:expr),*) => {
 
        enabled_debug_print!(false, "exec", $format, $($args),*);
 
        enabled_debug_print!(true, "exec", $format, $($args),*);
 
    };
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum ExprInstruction {
 
    EvalExpr(ExpressionId),
 
    PushValToFront,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct Frame {
 
    pub(crate) definition: ProcedureDefinitionId,
 
    pub(crate) monomorph_type_id: TypeId,
 
    pub(crate) monomorph_index: usize,
 
    pub(crate) position: StatementId,
 
    pub(crate) expr_stack: VecDeque<ExprInstruction>, // hack for expression evaluation, evaluated by popping from back
 
    pub(crate) expr_values: VecDeque<Value>, // hack for expression results, evaluated by popping from front/back
 
    pub(crate) max_stack_size: u32,
 
}
 

	
 
impl Frame {
 
    /// Creates a new execution frame. Does not modify the stack in any way.
 
    pub fn new(heap: &Heap, definition_id: ProcedureDefinitionId, monomorph_type_id: TypeId, monomorph_index: u32) -> Self {
 
        let definition = &heap[definition_id];
 
        let outer_scope_id = definition.scope;
 
        let first_statement_id = definition.body;
 

	
 
        // Another not-so-pretty thing that has to be replaced somewhere in the
 
        // future...
 
        fn determine_max_stack_size(heap: &Heap, scope_id: ScopeId, max_size: &mut u32) {
 
            let scope = &heap[scope_id];
 

	
 
            // Check current block
 
            let cur_size = scope.next_unique_id_in_scope as u32;
 
            if cur_size > *max_size { *max_size = cur_size; }
 

	
 
            // And child blocks
 
            for child_scope in &scope.nested {
 
                determine_max_stack_size(heap, *child_scope, max_size);
 
            }
 
        }
 

	
 
        let mut max_stack_size = 0;
 
        determine_max_stack_size(heap, outer_scope_id, &mut max_stack_size);
 

	
 
        Frame{
 
            definition: definition_id,
 
            monomorph_type_id,
 
            monomorph_index: monomorph_index as usize,
 
            position: first_statement_id.upcast(),
 
            expr_stack: VecDeque::with_capacity(128),
 
            expr_values: VecDeque::with_capacity(128),
 
            max_stack_size,
 
        }
 
    }
 

	
 
    /// Prepares a single expression for execution. This involves walking the
 
    /// expression tree and putting them in the `expr_stack` such that
 
    /// continuously popping from its back will evaluate the expression. The
 
    /// results of each expression will be stored by pushing onto `expr_values`.
 
    pub fn prepare_single_expression(&mut self, heap: &Heap, expr_id: ExpressionId) {
 
        debug_assert!(self.expr_stack.is_empty());
 
        self.expr_values.clear(); // May not be empty if last expression result(s) were discarded
 

	
 
        self.serialize_expression(heap, expr_id);
 
    }
 

	
 
    /// Prepares multiple expressions for execution (i.e. evaluating all
 
    /// function arguments or all elements of an array/union literal). Per
 
    /// expression this works the same as `prepare_single_expression`. However
 
    /// after each expression is evaluated we insert a `PushValToFront`
 
    /// instruction
 
    pub fn prepare_multiple_expressions(&mut self, heap: &Heap, expr_ids: &[ExpressionId]) {
 
        debug_assert!(self.expr_stack.is_empty());
 
        self.expr_values.clear();
 

	
 
        for expr_id in expr_ids {
 
            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
            self.serialize_expression(heap, *expr_id);
 
        }
 
    }
 

	
 
    /// Performs depth-first serialization of expression tree. Let's not care
 
    /// about performance for a temporary runtime implementation
 
    fn serialize_expression(&mut self, heap: &Heap, id: ExpressionId) {
 
        self.expr_stack.push_back(ExprInstruction::EvalExpr(id));
 

	
 
        match &heap[id] {
 
            Expression::Assignment(expr) => {
 
                self.serialize_expression(heap, expr.left);
 
                self.serialize_expression(heap, expr.right);
 
            },
 
            Expression::Binding(expr) => {
 
                self.serialize_expression(heap, expr.bound_to);
 
                self.serialize_expression(heap, expr.bound_from);
 
            },
 
@@ -207,193 +207,193 @@ pub enum EvalContinuation {
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, ValueGroup),
 
    SelectStart(u32, u32), // (num_cases, num_ports_total)
 
    SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id)
 
    SelectWait, // wait until select can continue
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
    SyncBlockStart,
 
    NewComponent(ProcedureDefinitionId, TypeId, ValueGroup),
 
    NewChannel,
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
    pub(crate) frames: Vec<Frame>,
 
    pub(crate) store: Store,
 
}
 

	
 
impl Prompt {
 
    pub fn new(types: &TypeTable, heap: &Heap, def: ProcedureDefinitionId, type_id: TypeId, args: ValueGroup) -> Self {
 
        let mut prompt = Self{
 
            frames: Vec::new(),
 
            store: Store::new(),
 
        };
 

	
 
        // Maybe do typechecking in the future?
 
        let monomorph_index = types.get_monomorph(type_id).variant.as_procedure().monomorph_index;
 
        let new_frame = Frame::new(heap, def, type_id, monomorph_index);
 
        let max_stack_size = new_frame.max_stack_size;
 
        prompt.frames.push(new_frame);
 
        args.into_store(&mut prompt.store);
 
        prompt.store.reserve_stack(max_stack_size);
 

	
 
        prompt
 
    }
 

	
 
    /// Big 'ol function right here. Didn't want to break it up unnecessarily.
 
    /// It consists of, in sequence: executing any expressions that should be
 
    /// executed before the next statement can be evaluated, then a section that
 
    /// performs debug printing, and finally a section that takes the next
 
    /// statement and executes it. If the statement requires any expressions to
 
    /// be evaluated, then they will be added such that the next time `step` is
 
    /// called, all of these expressions are indeed evaluated.
 
    pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut impl RunContext) -> EvalResult {
 
        // Helper function to transfer multiple values from the expression value
 
        // array into a heap region (e.g. constructing arrays or structs).
 
        fn transfer_expression_values_front_into_heap(cur_frame: &mut Frame, store: &mut Store, num_values: usize) -> HeapPos {
 
            let heap_pos = store.alloc_heap();
 

	
 
            // Do the transformation first (because Rust...)
 
            for val_idx in 0..num_values {
 
                cur_frame.expr_values[val_idx] = store.read_take_ownership(cur_frame.expr_values[val_idx].clone());
 
            }
 

	
 
            // And now transfer to the heap region
 
            let values = &mut store.heap_regions[heap_pos as usize].values;
 
            debug_assert!(values.is_empty());
 
            values.reserve(num_values);
 
            for _ in 0..num_values {
 
                values.push(cur_frame.expr_values.pop_front().unwrap());
 
            }
 

	
 
            heap_pos
 
        }
 

	
 
        // Helper function to make sure that an index into an aray is valid.
 
        fn array_inclusive_index_is_invalid(store: &Store, array_heap_pos: u32, idx: i64) -> bool {
 
            let array_len = store.heap_regions[array_heap_pos as usize].values.len();
 
            return idx < 0 || idx >= array_len as i64;
 
        }
 

	
 
        fn array_exclusive_index_is_invalid(store: &Store, array_heap_pos: u32, idx: i64) -> bool {
 
            let array_len = store.heap_regions[array_heap_pos as usize].values.len();
 
            return idx < 0 || idx > array_len as i64;
 
        }
 

	
 
        fn construct_array_error(prompt: &Prompt, modules: &[Module], heap: &Heap, expr_id: ExpressionId, heap_pos: u32, idx: i64) -> EvalError {
 
            let array_len = prompt.store.heap_regions[heap_pos as usize].values.len();
 
            return EvalError::new_error_at_expr(
 
                prompt, modules, heap, expr_id,
 
                format!("index {} is out of bounds: array length is {}", idx, array_len)
 
            )
 
        }
 

	
 
        // Checking if we're at the end of execution
 
        let cur_frame = self.frames.last_mut().unwrap();
 
        if cur_frame.position.is_invalid() {
 
            if heap[cur_frame.definition].kind == ProcedureKind::Function {
 
                todo!("End of function without return, return an evaluation error");
 
            }
 
            return Ok(EvalContinuation::ComponentTerminated);
 
        }
 

	
 
        debug_log!("Taking step in '{}'", heap[cur_frame.definition].identifier().value.as_str());
 
        debug_log!("Taking step in '{}'", heap[cur_frame.definition].identifier.value.as_str());
 

	
 
        // Execute all pending expressions
 
        while !cur_frame.expr_stack.is_empty() {
 
            let next = cur_frame.expr_stack.pop_back().unwrap();
 
            debug_log!("Expr stack: {:?}", next);
 
            match next {
 
                ExprInstruction::PushValToFront => {
 
                    cur_frame.expr_values.rotate_right(1);
 
                },
 
                ExprInstruction::EvalExpr(expr_id) => {
 
                    let expr = &heap[expr_id];
 
                    match expr {
 
                        Expression::Assignment(expr) => {
 
                            let to = cur_frame.expr_values.pop_back().unwrap().as_ref();
 
                            let rhs = cur_frame.expr_values.pop_back().unwrap();
 

	
 
                            // Note: although not pretty, the assignment operator takes ownership
 
                            // of the right-hand side value when possible. So we do not drop the
 
                            // rhs's optionally owned heap data.
 
                            let rhs = self.store.read_take_ownership(rhs);
 
                            apply_assignment_operator(&mut self.store, to, expr.operation, rhs);
 
                        },
 
                        Expression::Binding(_expr) => {
 
                            let bind_to = cur_frame.expr_values.pop_back().unwrap();
 
                            let bind_from = cur_frame.expr_values.pop_back().unwrap();
 
                            let bind_to_heap_pos = bind_to.get_heap_pos();
 
                            let bind_from_heap_pos = bind_from.get_heap_pos();
 

	
 
                            let result = apply_binding_operator(&mut self.store, bind_to, bind_from);
 
                            self.store.drop_value(bind_to_heap_pos);
 
                            self.store.drop_value(bind_from_heap_pos);
 
                            cur_frame.expr_values.push_back(Value::Bool(result));
 
                        },
 
                        Expression::Conditional(expr) => {
 
                            // Evaluate testing expression, then extend the
 
                            // expression stack with the appropriate expression
 
                            let test_result = cur_frame.expr_values.pop_back().unwrap().as_bool();
 
                            if test_result {
 
                                cur_frame.serialize_expression(heap, expr.true_expression);
 
                            } else {
 
                                cur_frame.serialize_expression(heap, expr.false_expression);
 
                            }
 
                        },
 
                        Expression::Binary(expr) => {
 
                            let lhs = cur_frame.expr_values.pop_back().unwrap();
 
                            let rhs = cur_frame.expr_values.pop_back().unwrap();
 
                            let result = apply_binary_operator(&mut self.store, &lhs, expr.operation, &rhs);
 
                            cur_frame.expr_values.push_back(result);
 
                            self.store.drop_value(lhs.get_heap_pos());
 
                            self.store.drop_value(rhs.get_heap_pos());
 
                        },
 
                        Expression::Unary(expr) => {
 
                            let val = cur_frame.expr_values.pop_back().unwrap();
 
                            let result = apply_unary_operator(&mut self.store, expr.operation, &val);
 
                            cur_frame.expr_values.push_back(result);
 
                            self.store.drop_value(val.get_heap_pos());
 
                        },
 
                        Expression::Indexing(_expr) => {
 
                            // Evaluate index. Never heap allocated so we do
 
                            // not have to drop it.
 
                            let index = cur_frame.expr_values.pop_back().unwrap();
 
                            let index = self.store.maybe_read_ref(&index);
 

	
 
                            debug_assert!(index.is_integer());
 
                            let index = if index.is_signed_integer() {
 
                                index.as_signed_integer() as i64
 
                            } else {
 
                                index.as_unsigned_integer() as i64
 
                            };
 

	
 
                            let subject = cur_frame.expr_values.pop_back().unwrap();
 

	
 
                            let (deallocate_heap_pos, value_to_push) = match subject {
 
                                Value::Ref(value_ref) => {
 
                                    // Our expression stack value is a reference to something that
 
                                    // exists in the normal stack/heap. We don't want to deallocate
 
                                    // this thing. Rather we want to return a reference to it.
 
                                    let subject = self.store.read_ref(value_ref);
 
                                    let subject_heap_pos = match subject {
 
                                        Value::String(v) => *v,
 
                                        Value::Array(v) => *v,
 
                                        Value::Message(v) => *v,
 
                                        _ => unreachable!(),
 
                                    };
 

	
 
                                    if array_inclusive_index_is_invalid(&self.store, subject_heap_pos, index) {
 
                                        return Err(construct_array_error(self, modules, heap, expr_id, subject_heap_pos, index));
 
                                    }
 

	
 
                                    (None, Value::Ref(ValueId::Heap(subject_heap_pos, index as u32)))
 
                                },
 
                                _ => {
 
                                    // Our value lives on the expression stack, hence we need to
 
                                    // clone whatever we're referring to. Then drop the subject.
 
                                    let subject_heap_pos = match &subject {
 
                                        Value::String(v) => *v,
src/protocol/mod.rs
Show inline comments
 
mod arena;
 
pub(crate) mod eval;
 
pub(crate) mod input_source;
 
mod parser;
 
#[cfg(test)] mod tests;
 

	
 
pub(crate) mod ast;
 
pub(crate) mod ast_printer;
 

	
 
use std::sync::Mutex;
 

	
 
use crate::collections::{StringPool, StringRef};
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::type_table::*;
 

	
 
pub use parser::type_table::TypeId;
 

	
 
/// A protocol description module
 
pub struct Module {
 
    pub(crate) source: InputSource,
 
    pub(crate) root_id: RootId,
 
    pub(crate) name: Option<StringRef<'static>>,
 
}
 
/// Description of a protocol object, used to configure new connectors.
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    pub(crate) modules: Vec<Module>,
 
    pub(crate) heap: Heap,
 
    pub(crate) types: TypeTable,
 
    pub(crate) pool: Mutex<StringPool>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentState {
 
    pub(crate) prompt: Prompt,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
    UnownedPort,
 
    InSync,
 
}
 

	
 
impl ProtocolDescription {
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let source = InputSource::new(String::new(), Vec::from(buffer));
 
        let mut parser = Parser::new();
 
        parser.feed(source).expect("failed to feed source");
 
        
 

	
 
        parser.write_ast_to = Some(String::from("hello.txt"));
 
        if let Err(err) = parser.parse() {
 
            println!("ERROR:\n{}", err);
 
            return Err(format!("{}", err))
 
        }
 

	
 
        debug_assert_eq!(parser.modules.len(), 1, "only supporting one module here for now");
 
        let modules: Vec<Module> = parser.modules.into_iter()
 
            .map(|module| Module{
 
                source: module.source,
 
                root_id: module.root_id,
 
                name: module.name.map(|(_, name)| name)
 
            })
 
            .collect();
 

	
 
        return Ok(ProtocolDescription {
 
            modules,
 
            heap: parser.heap,
 
            types: parser.type_table,
 
            pool: Mutex::new(parser.string_pool),
 
        });
 
    }
 

	
 
    pub(crate) fn new_component(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<Prompt, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&self.heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let ast_definition = &self.heap[definition_id];
 
        if !ast_definition.is_procedure() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
        let ast_definition = ast_definition.as_procedure();
 
        if !ast_definition.poly_vars.is_empty() || ast_definition.kind == ProcedureKind::Function {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // - check number of arguments by retrieving the one instantiated
 
        //   monomorph
 
        let concrete_type = ConcreteType{ parts: vec![ConcreteTypePart::Component(ast_definition.this, 0)] };
 
        let procedure_type_id = self.types.get_procedure_monomorph_type_id(&definition_id, &concrete_type.parts).unwrap();
 
        let procedure_monomorph_index = self.types.get_monomorph(procedure_type_id).variant.as_procedure().monomorph_index;
 
        let monomorph_info = &ast_definition.monomorphs[procedure_monomorph_index as usize];
 
        if monomorph_info.argument_types.len() != arguments.values.len() {
 
            return Err(ComponentCreationError::InvalidNumArguments);
 
        }
 

	
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type_id = monomorph_info.argument_types[arg_idx];
 
            let expected_type = &self.types.get_monomorph(expected_type_id).concrete_type;
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(Prompt::new(&self.types, &self.heap, ast_definition.this, procedure_type_id, arguments));
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
 
                None => if module_name.is_empty() {
 
                    return Some(module.root_id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Pointer | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -287,205 +287,208 @@ impl CompPDL {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync | Mode::BlockedSelect => {
 
                // continue and run PDL code
 
            },
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state.is_blocked() {
 
                    self.mode = Mode::BlockedPut;
 
                    self.mode_port = port_id;
 
                    self.mode_value = value;
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                println!("DEBUG: AAAAAAAAAA");
 
                self.select.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                println!("DEBUG: BBBBBBBBBBB");
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                println!("DEBUG: CCCCCCCCCCC");
 
                let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // No decision yet
 
                    self.mode = Mode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.mode = Mode::StartExit;
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -39,97 +39,134 @@ fn test_component_communication() {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    func infinite_assert<T>(T val, T expected) -> () {
 
        while (val != expected) { print(\"nope!\"); }
 
        return ();
 
    }
 

	
 
    primitive receiver(in<u32> in_a, in<u32> in_b, u32 num_sends) {
 
        auto num_from_a = 0;
 
        auto num_from_b = 0;
 
        while (num_from_a + num_from_b < 2 * num_sends) {
 
            sync select {
 
                auto v = get(in_a) -> {
 
                    print(\"got something from A\");
 
                    auto _ = infinite_assert(v, num_from_a);
 
                    num_from_a += 1;
 
                }
 
                auto v = get(in_b) -> {
 
                    print(\"got something from B\");
 
                    auto _ = infinite_assert(v, num_from_b);
 
                    num_from_b += 1;
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive sender(out<u32> tx, u32 num_sends) {
 
        auto index = 0;
 
        while (index < num_sends) {
 
            sync {
 
                put(tx, index);
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    composite constructor() {
 
        auto num_sends = 15;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
        new receiver(rx_a, rx_b, num_sends);
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_empty_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive constructor() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> print(\"hello\"); }
 
            index += 1;
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_empty_select_should_not_work() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive constructor() {
 
        u32 index = 0;
 
        print(\"before while\");
 
        while (index < 5) {
 
            print(\"before sync\");
 
            sync {
 
                print(\"before select\");
 
                select { }
 
                print(\"after select\");
 
            }
 
            print(\"after sync\");
 
        }
 
        print(\"after while\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)