Changeset - 665aa326769e
[Not reviewed]
0 10 0
mh - 4 years ago 2021-09-22 11:04:26
contact@maxhenger.nl
add 'print' fn for testing, first test for new runtime
10 files changed with 221 insertions and 51 deletions:
0 comments (0 inline, 0 general)
src/protocol/ast.rs
Show inline comments
 
@@ -1637,24 +1637,25 @@ pub struct CallExpression {
 
    pub unique_id_in_definition: i32,
 
}
 

	
 
#[derive(Debug, Clone, PartialEq, Eq)]
 
pub enum Method {
 
    // Builtin
 
    Get,
 
    Put,
 
    Fires,
 
    Create,
 
    Length,
 
    Assert,
 
    Print,
 
    UserFunction,
 
    UserComponent,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MethodSymbolic {
 
    pub(crate) parser_type: ParserType,
 
    pub(crate) definition: DefinitionId
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct LiteralExpression {
src/protocol/eval/executor.rs
Show inline comments
 
@@ -604,34 +604,24 @@ impl Prompt {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_port_value = self.store.maybe_read_ref(&port_value).clone();
 

	
 
                                    let port_id = if let Value::Output(port_id) = deref_port_value {
 
                                        port_id
 
                                    } else {
 
                                        unreachable!("executor calling 'put' on value {:?}", deref_port_value)
 
                                    };
 

	
 
                                    let msg_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone();
 

	
 
                                    match deref_msg_value {
 
                                        Value::Message(_) => {},
 
                                        _ => {
 
                                            return Err(EvalError::new_error_at_expr(
 
                                                self, modules, heap, expr_id,
 
                                                String::from("Calls to `put` are currently restricted to only send instances of `msg` types. This will change in the future")
 
                                            ));
 
                                        }
 
                                    }
 

	
 
                                    if ctx.did_put(port_id) {
 
                                        // We're fine, deallocate in case the expression value stack
 
                                        // held an owned value
 
                                        self.store.drop_value(msg_value.get_heap_pos());
 
                                    } else {
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        return Ok(EvalContinuation::Put(port_id, deref_msg_value));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
@@ -694,24 +684,41 @@ impl Prompt {
 

	
 
                                    // TODO: @PtrInt
 
                                    cur_frame.expr_values.push_back(Value::UInt32(len as u32));
 
                                    self.store.drop_value(value_heap_pos);
 
                                },
 
                                Method::Assert => {
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value = self.store.maybe_read_ref(&value).clone();
 
                                    if !value.as_bool() {
 
                                        return Ok(EvalContinuation::Inconsistent)
 
                                    }
 
                                },
 
                                Method::Print => {
 
                                    // Convert the runtime-variant of a string
 
                                    // into an actual string.
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let value_heap_pos = value.as_string();
 
                                    let elements = &self.store.heap_regions[value_heap_pos as usize].values;
 

	
 
                                    let mut message = String::with_capacity(elements.len());
 
                                    for element in elements {
 
                                        message.push(element.as_char());
 
                                    }
 

	
 
                                    // Drop the heap-allocated value from the
 
                                    // store
 
                                    self.store.drop_heap_pos(value_heap_pos);
 
                                    println!("{}", message);
 
                                },
 
                                Method::UserComponent => {
 
                                    // This is actually handled by the evaluation
 
                                    // of the statement.
 
                                    debug_assert_eq!(heap[expr.definition].parameters().len(), cur_frame.expr_values.len());
 
                                    debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this)
 
                                },
 
                                Method::UserFunction => {
 
                                    // Push a new frame. Note that all expressions have
 
                                    // been pushed to the front, so they're in the order
 
                                    // of the definition.
 
                                    let num_args = expr.arguments.len();
 

	
src/protocol/eval/store.rs
Show inline comments
 
@@ -43,28 +43,34 @@ impl Store {
 
        let new_size = self.cur_stack_boundary + unique_stack_idx as usize + 1;
 
        if new_size > self.stack.len() {
 
            self.stack.resize(new_size, Value::Unassigned);
 
        }
 
    }
 

	
 
    /// Clears values on the stack and removes their heap allocations when
 
    /// applicable. The specified index itself will also be cleared (so if you
 
    /// specify 0 all values in the frame will be destroyed)
 
    pub(crate) fn clear_stack(&mut self, unique_stack_idx: usize) {
 
        let new_size = self.cur_stack_boundary + unique_stack_idx + 1;
 
        for idx in new_size..self.stack.len() {
 
            self.drop_value(self.stack[idx].get_heap_pos());
 
            let heap_pos = self.stack[idx].get_heap_pos();
 
            self.drop_value(heap_pos);
 

	
 
            // TODO: @remove, somewhat temporarily not clearing pure stack
 
            //  values for testing purposes.
 
            if heap_pos.is_some() {
 
                self.stack[idx] = Value::Unassigned;
 
            }
 
        }
 
    }
 

	
 
    /// Reads a value and takes ownership. This is different from a move because
 
    /// the value might indirectly reference stack/heap values. For these kinds
 
    /// values we will actually return a cloned value.
 
    pub(crate) fn read_take_ownership(&mut self, value: Value) -> Value {
 
        match value {
 
            Value::Ref(ValueId::Stack(pos)) => {
 
                let abs_pos = self.cur_stack_boundary + 1 + pos as usize;
 
                return self.clone_value(self.stack[abs_pos].clone());
 
            },
 
            Value::Ref(ValueId::Heap(heap_pos, value_idx)) => {
 
                let heap_pos = heap_pos as usize;
src/protocol/parser/mod.rs
Show inline comments
 
@@ -152,24 +152,30 @@ impl Parser {
 
        insert_builtin_function(&mut parser, "length", &["T"], |id| (
 
            vec![
 
                ("array", quick_type(&[PTV::ArrayLike, PTV::PolymorphicArgument(id.upcast(), 0)]))
 
            ],
 
            quick_type(&[PTV::UInt32]) // TODO: @PtrInt
 
        ));
 
        insert_builtin_function(&mut parser, "assert", &[], |_id| (
 
            vec![
 
                ("condition", quick_type(&[PTV::Bool])),
 
            ],
 
            quick_type(&[PTV::Void])
 
        ));
 
        insert_builtin_function(&mut parser, "print", &[], |_id| (
 
            vec![
 
                ("message", quick_type(&[PTV::String])),
 
            ],
 
            quick_type(&[PTV::Void])
 
        ));
 

	
 
        parser
 
    }
 

	
 
    pub fn feed(&mut self, mut source: InputSource) -> Result<(), ParseError> {
 
        // TODO: @Optimize
 
        let mut token_buffer = TokenBuffer::new();
 
        self.pass_tokenizer.tokenize(&mut source, &mut token_buffer)?;
 

	
 
        let module = Module{
 
            source,
 
            tokens: token_buffer,
src/protocol/parser/pass_definitions.rs
Show inline comments
 
@@ -1442,31 +1442,32 @@ impl PassDefinitions {
 
                                    this, func_span, full_span,
 
                                    parser_type,
 
                                    method: Method::UserComponent,
 
                                    arguments,
 
                                    definition: target_definition_id,
 
                                    parent: ExpressionParent::None,
 
                                    unique_id_in_definition: -1,
 
                                }).upcast()
 
                            },
 
                            Definition::Function(function_definition) => {
 
                                // Check whether it is a builtin function
 
                                let method = if function_definition.builtin {
 
                                    match function_definition.identifier.value.as_str() {
 
                                        "get" => Method::Get,
 
                                        "put" => Method::Put,
 
                                        "fires" => Method::Fires,
 
                                        "create" => Method::Create,
 
                                        "length" => Method::Length,
 
                                        "assert" => Method::Assert,
 
                                    match function_definition.identifier.value.as_bytes() {
 
                                        KW_FUNC_GET => Method::Get,
 
                                        KW_FUNC_PUT => Method::Put,
 
                                        KW_FUNC_FIRES => Method::Fires,
 
                                        KW_FUNC_CREATE => Method::Create,
 
                                        KW_FUNC_LENGTH => Method::Length,
 
                                        KW_FUNC_ASSERT => Method::Assert,
 
                                        KW_FUNC_PRINT => Method::Print,
 
                                        _ => unreachable!(),
 
                                    }
 
                                } else {
 
                                    Method::UserFunction
 
                                };
 

	
 
                                // Function call: consume the arguments
 
                                let func_span = parser_type.full_span;
 
                                let mut full_span = func_span;
 
                                let arguments = self.consume_expression_list(
 
                                    module, iter, ctx, Some(&mut full_span.end)
 
                                )?;
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -1047,24 +1047,25 @@ impl Visitor for PassValidationLinking {
 
                        &ctx.module().source, call_span,
 
                        "assert statement may only occur in components"
 
                    ));
 
                }
 
                if self.in_sync.is_invalid() {
 
                    let call_span = call_expr.func_span;
 
                    return Err(ParseError::new_error_str_at_span(
 
                        &ctx.module().source, call_span,
 
                        "assert statements may only occur inside synchronous blocks"
 
                    ));
 
                }
 
            },
 
            Method::Print => {},
 
            Method::UserFunction => {},
 
            Method::UserComponent => {
 
                expected_wrapping_new_stmt = true;
 
            },
 
        }
 

	
 
        if expected_wrapping_new_stmt {
 
            if !self.expr_parent.is_new() {
 
                let call_span = call_expr.func_span;
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, call_span,
 
                    "cannot call a component, it can only be instantiated by using 'new'"
src/protocol/parser/token_parsing.rs
Show inline comments
 
@@ -25,24 +25,25 @@ pub(crate) const KW_IMPORT:    &'static [u8] = b"import";
 
pub(crate) const KW_LIT_TRUE:  &'static [u8] = b"true";
 
pub(crate) const KW_LIT_FALSE: &'static [u8] = b"false";
 
pub(crate) const KW_LIT_NULL:  &'static [u8] = b"null";
 

	
 
// Keywords - function(like)s
 
pub(crate) const KW_CAST:        &'static [u8] = b"cast";
 
pub(crate) const KW_FUNC_GET:    &'static [u8] = b"get";
 
pub(crate) const KW_FUNC_PUT:    &'static [u8] = b"put";
 
pub(crate) const KW_FUNC_FIRES:  &'static [u8] = b"fires";
 
pub(crate) const KW_FUNC_CREATE: &'static [u8] = b"create";
 
pub(crate) const KW_FUNC_LENGTH: &'static [u8] = b"length";
 
pub(crate) const KW_FUNC_ASSERT: &'static [u8] = b"assert";
 
pub(crate) const KW_FUNC_PRINT:  &'static [u8] = b"print";
 

	
 
// Keywords - statements
 
pub(crate) const KW_STMT_CHANNEL:  &'static [u8] = b"channel";
 
pub(crate) const KW_STMT_IF:       &'static [u8] = b"if";
 
pub(crate) const KW_STMT_ELSE:     &'static [u8] = b"else";
 
pub(crate) const KW_STMT_WHILE:    &'static [u8] = b"while";
 
pub(crate) const KW_STMT_BREAK:    &'static [u8] = b"break";
 
pub(crate) const KW_STMT_CONTINUE: &'static [u8] = b"continue";
 
pub(crate) const KW_STMT_GOTO:     &'static [u8] = b"goto";
 
pub(crate) const KW_STMT_RETURN:   &'static [u8] = b"return";
 
pub(crate) const KW_STMT_SYNC:     &'static [u8] = b"synchronous";
 
pub(crate) const KW_STMT_NEW:      &'static [u8] = b"new";
 
@@ -526,25 +527,25 @@ fn is_reserved_statement_keyword(text: &[u8]) -> bool {
 
        KW_IMPORT | KW_AS |
 
        KW_STMT_CHANNEL | KW_STMT_IF | KW_STMT_WHILE |
 
        KW_STMT_BREAK | KW_STMT_CONTINUE | KW_STMT_GOTO | KW_STMT_RETURN |
 
        KW_STMT_SYNC | KW_STMT_NEW => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_LET | KW_CAST |
 
        KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL |
 
        KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH => true,
 
        KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_type_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_TYPE_IN_PORT | KW_TYPE_OUT_PORT | KW_TYPE_MESSAGE | KW_TYPE_BOOL |
 
        KW_TYPE_UINT8 | KW_TYPE_UINT16 | KW_TYPE_UINT32 | KW_TYPE_UINT64 |
 
        KW_TYPE_SINT8 | KW_TYPE_SINT16 | KW_TYPE_SINT32 | KW_TYPE_SINT64 |
 
        KW_TYPE_CHAR | KW_TYPE_STRING |
 
        KW_TYPE_INFERRED => true,
 
        _ => false,
src/runtime2/messages.rs
Show inline comments
 
@@ -41,25 +41,25 @@ impl ConnectorInbox {
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.sending_port.0.u32_suffix,
 
            port_id: message.receiving_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::messages::*;
 

	
 
enum AddComponentError {
 
#[derive(Debug)]
 
pub enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
struct PortDesc {
 
pub(crate) struct PortDesc {
 
    id: u32,
 
    peer_id: u32,
 
    owning_connector_id: Option<u32>,
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
struct ConnectorDesc {
 
pub(crate) struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    pub(crate) branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        let mut branches_active = VecDeque::new();
 
        branches_active.push_back(0);
 

	
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            spec_branches_active: branches_active,
 
            spec_branches_active: VecDeque::new(),
 
            spec_branches_pending_receive: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
enum BranchState {
 
pub(crate) enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
    Finished, // branch (necessarily non-sync) that reached end of code
 
}
 

	
 
#[derive(Clone)]
 
#[derive(Debug, Clone)]
 
struct BranchPortDesc {
 
    last_registered_index: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchContext {
 
    just_called_did_put: bool,
 
    pending_channel: Option<(Value, Value)>,
 
}
 

	
 
struct BranchDesc {
 
pub(crate) struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    pub(crate) code_state: ComponentState,
 
    pub(crate) branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
    branch_context: BranchContext,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
@@ -150,29 +149,29 @@ struct ProposedConnectorSolution {
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    port_mapping: HashMap<u32, Option<u32>>, // port IDs of the connector, mapped to their branch IDs (None for silent ports)
 
}
 

	
 
#[derive(Clone)]
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_constraints: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
pub struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    ports: HashMap<u32, PortDesc>,
 
    pub(crate) ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    pub(crate) connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            ports: HashMap::new(),
 
            port_counter: 0,
 
            connectors: HashMap::new(),
 
            connector_counter: 0,
 
@@ -201,54 +200,68 @@ impl Runtime {
 
        // TODO: Remove this error enum translation. Note that for now this
 
        //  function forces port-only arguments
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct
 
        // Make sure supplied values (and types) are correct. At the same time
 
        // modify the port IDs such that they contain the ID of the connector
 
        // we're about the create.
 
        let component_id = self.generate_connector_id();
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                    ports.push(PortId(Id{
 
                        connector_id: component_id,
 
                        u32_suffix: port_id.0.u32_suffix,
 
                    }));
 
                },
 
                Value::Output(port_id) => {
 
                    if *polarity != Polarity::Putter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                    ports.push(PortId(Id{
 
                        connector_id: component_id,
 
                        u32_suffix: port_id.0.u32_suffix
 
                    }));
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component
 
        let component_id = self.generate_connector_id();
 
        // Instantiate the component, and mark the ports as being owned by the
 
        // newly instantiated component
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
        let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 

	
 
        for port in &ports {
 
            let desc = self.ports.get_mut(port).unwrap();
 
            desc.owning_connector_id = Some(component_id);
 
        }
 

	
 
        self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports));
 
        self.connectors_active.push_back(component_id);
 

	
 
        Ok(())
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector until it indicates we can run another
 
            // connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
@@ -265,46 +278,46 @@ impl Runtime {
 
            }
 

	
 
            // Deal with any outgoing messages and potential solutions
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_new_solutions(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, connector_id: u32) -> Scheduling {
 
    fn run_connector(&mut self, connector_id: u32) -> Scheduling {
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 

	
 
        if desc.in_sync {
 
            return self.run_connector_sync_mode(connector_id);
 
        } else {
 
            return self.run_connector_regular_mode(connector_id);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve connector and branch that is supposed to be run
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(desc.in_sync);
 
        debug_assert!(!desc.spec_branches_active.is_empty());
 

	
 
        let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch = &mut desc.branches[branch_index as usize];
 
        debug_assert_eq!(branch_index, branch.index);
 

	
 
        // Run this particular branch to a next blocking point
 
        // TODO: PERSISTENT RUN CTX
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 

	
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent. So we don't
 
                // run it again
 
@@ -337,24 +350,26 @@ impl Runtime {
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch just performed a `get()` on a port that did
 
                // not yet receive a value.
 

	
 
                // First check if a port value is assigned to the
 
                // current branch. If so, check if it is consistent.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                let mut insert_in_pending_receive = false;
 

	
 
                println!("DEBUG: Connector {} performing get on {:#?}", connector_id, port_id);
 

	
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet, so force to firing
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: None,
 
                            num_times_fired: 1,
 
                        });
 
                        branch.branch_state = BranchState::BranchPoint;
 
                        insert_in_pending_receive = true;
 
                    },
 
                    Entry::Occupied(entry) => {
 
                        // Have an entry, check if it is consistent
 
@@ -363,24 +378,26 @@ impl Runtime {
 
                            // Inconsistent
 
                            branch.branch_state = BranchState::Failed;
 
                        } else {
 
                            // Perfectly fine, add to queue
 
                            debug_assert!(entry.last_registered_index.is_none());
 
                            assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                            branch.branch_state = BranchState::BranchPoint;
 
                            insert_in_pending_receive = true;
 
                        }
 
                    }
 
                }
 

	
 
                println!("DEBUG: insert = {}, port mapping is now {:#?}", insert_in_pending_receive, &branch.port_mapping);
 

	
 
                if insert_in_pending_receive {
 
                    // Perform the insert
 
                    match desc.spec_branches_pending_receive.entry(port_id) {
 
                        Entry::Vacant(entry) => {
 
                            entry.insert(vec![branch_index]);
 
                        }
 
                        Entry::Occupied(mut entry) => {
 
                            let entry = entry.get_mut();
 
                            debug_assert!(!entry.contains(&branch_index));
 
                            entry.push(branch_index);
 
                        }
 
                    }
 
@@ -400,45 +417,46 @@ impl Runtime {
 
                            desc.spec_branches_active.push_back(new_branch_idx);
 
                        }
 

	
 
                        if !messages.is_empty() {
 
                            return Scheduling::Immediate;
 
                        }
 
                    }
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Check the branch for any ports that were not used and
 
                // insert them in the port mapping as not having fired.
 
                for port_index in branch.owned_ports.iter().copied() {
 
                    let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                for port_id in branch.owned_ports.iter().copied() {
 
                    let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id });
 
                    if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                        entry.insert(BranchPortDesc {
 
                            last_registered_index: None,
 
                            num_times_fired: 0
 
                        });
 
                    }
 
                }
 

	
 
                // Mark the branch as being done
 
                branch.branch_state = BranchState::ReachedEndSync;
 
                desc.spec_branches_done.push(branch_index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                println!("DEBUG: Connector {} performing put on {:#?}", connector_id, port_id);
 
                let mut can_put = true;
 
                branch.branch_context.just_called_did_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: Some(branch.index),
 
                            num_times_fired: 1,
 
                        });
 
                    },
 
                    Entry::Occupied(mut entry) => {
 
                        // Pre-existing entry
 
@@ -450,45 +468,47 @@ impl Runtime {
 
                            can_put = false;
 
                        } else if entry.last_registered_index.is_none() {
 
                            // A put() that follows a fires()
 
                            entry.last_registered_index = Some(branch.index);
 
                        } else {
 
                            // This should be fine in the future. But
 
                            // for now we throw an error as it doesn't
 
                            // mesh well with the 'fires()' concept.
 
                            todo!("throw an error of some sort, then fail all related")
 
                        }
 
                    }
 
                }
 
                println!("DEBUG: can_put = {}, port mapping is now {:#?}", can_put, &branch.port_mapping);
 

	
 
                if can_put {
 
                    // Actually put the message in the outbox
 
                    let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
                    debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id);
 
                    let peer_id = port_desc.peer_id;
 
                    let peer_desc = self.ports.get(&peer_id).unwrap();
 
                    debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                    let peer_id = PortId(Id{
 
                        connector_id: peer_desc.owning_connector_id.unwrap(),
 
                        u32_suffix: peer_id
 
                    });
 

	
 
                    // For now this is the one and only time we're going
 
                    // to send a message. So for now we can't send a
 
                    // branch ID.
 
                    desc.global_outbox.insert_message(BufferedMessage{
 
                        sending_port: port_id,
 
                        receiving_port: peer_id,
 
                        peer_prev_branch_id: None,
 
                        peer_cur_branch_id: 0,
 
                        peer_cur_branch_id: branch_index,
 
                        message: value_group,
 
                    });
 

	
 
                    // Finally, because we were able to put the message,
 
                    // we can run the branch again
 
                    desc.spec_branches_active.push_back(branch_index);
 
                    return Scheduling::Immediate;
 
                }
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
        }
 

	
 
@@ -497,41 +517,44 @@ impl Runtime {
 
        // speculative branches
 
        if desc.spec_branches_active.is_empty() {
 
            return Scheduling::NotNow;
 
        } else {
 
            return Scheduling::Later;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve the connector and the branch (which is always the first one,
 
        // since we assume we're not running in sync-mode).
 
        // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(!desc.in_sync);
 
        debug_assert!(desc.spec_branches_active.is_empty());
 
        debug_assert_eq!(desc.branches.len(), 1);
 

	
 
        let branch = &mut desc.branches[0];
 

	
 
        // Run this branch to its blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => return Scheduling::NotNow,
 
            RunResult::ComponentTerminated => {
 
                branch.branch_state = BranchState::Finished;
 
                return Scheduling::NotNow
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution
 
                Self::prepare_branch_for_sync(desc);
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Find all references to ports in the provided arguments, the
 
                // ownership of these ports will be transferred to the connector
 
                // we're about to create.
 
                let mut ports = Vec::with_capacity(arguments.values.len());
 
                find_ports_in_value_group(&arguments, &mut ports);
 

	
 
@@ -918,25 +941,24 @@ impl Runtime {
 
        debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch
 
        debug_assert!(connector.in_sync);
 
        debug_assert!(connector.spec_branches_done.contains(&branch_id));
 

	
 
        // Put the selected solution in front, the branch at index 0 is the
 
        // "non-sync" branch.
 
        connector.branches.swap(0, branch_id as usize);
 
        connector.branches.truncate(1);
 

	
 
        // And reset the connector's state for further execution
 
        connector.in_sync = false;
 
        connector.spec_branches_active.clear();
 
        connector.spec_branches_active.push_back(0);
 
        connector.spec_branches_pending_receive.clear();
 
        connector.spec_branches_done.clear();
 
        connector.last_checked_done = 0;
 
        connector.global_inbox.clear();
 
        connector.global_outbox.clear();
 

	
 
        // Do the same thing for the final selected branch
 
        let final_branch = &mut connector.branches[0];
 
        final_branch.index = 0;
 
        final_branch.parent_index = None;
 
        debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync);
 
        final_branch.branch_state = BranchState::RunningNonSync;
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::runtime::*;
 
use crate::ProtocolDescription;
 
use crate::protocol::eval::*;
 

	
 
#[test]
 
fn testing_runtime2() {
 
    println!("YESH!");
 
fn test_single_message() {
 
    // Simple test were we have a `putter` component, which will simply send a
 
    // single message (a boolean), and a `getter` component, which will receive
 
    // that message.
 
    // We will write this behaviour in the various ways that the language
 
    // currently allows. We will cheat a bit by peeking into the runtime to make
 
    // sure that the getter actually received the message.
 
    // TODO: Expose ports to a "native application"
 

	
 
    fn check_store_bool(value: &Value, expected: bool) {
 
        if let Value::Bool(value) = value {
 
            assert_eq!(*value, expected);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
    fn run_putter_getter(code: &[u8]) {
 
        // Compile code
 
        let pd = ProtocolDescription::parse(code)
 
            .expect("code successfully compiles");
 
        let pd = Arc::new(pd);
 

	
 
        // Construct runtime and the appropriate ports and connectors
 
        let mut rt = Runtime::new(pd);
 
        let (put_port, get_port) = rt.add_channel();
 

	
 
        let mut put_args = ValueGroup::new_stack(vec![
 
            put_port,
 
        ]);
 
        rt.add_component("", "putter", put_args)
 
            .expect("'putter' component created");
 

	
 
        let mut get_args = ValueGroup::new_stack(vec![
 
            get_port,
 
        ]);
 
        rt.add_component("", "getter", get_args)
 
            .expect("'getter' component created");
 

	
 
        // Run until completion
 
        rt.run();
 

	
 
        // Check for success (the 'received' and 'did_receive" flags)
 
        let getter_component = rt.connectors.get(&1).unwrap();
 
        let branch = &getter_component.branches[0];
 
        assert_eq!(branch.branch_state, BranchState::Finished);
 

	
 
        // Note: with the stack structure of the store, the first entry is the
 
        // "previous stack pos" and the second one is the input port passed to
 
        // the procedure. Hence the third/fourth entries are the boolean
 
        // variables on the stack.
 
        check_store_bool(&branch.code_state.prompt.store.stack[2], true);
 
        check_store_bool(&branch.code_state.prompt.store.stack[3], true);
 
    }
 

	
 
    // Without `fires()`, just a single valid behaviour
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                put(put_here, true);
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false;
 
            bool did_receive = false;
 

	
 
            synchronous {
 
                received = get(get_here);
 
                if (received) {
 
                    print(\"value was 'true'\");
 
                } else {
 
                    print(\"value was 'false'\");
 
                }
 
                did_receive = true;
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the putter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (!fires(put_here)) {
 
                    assert(false);
 
                } else {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                }
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the getter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (fires(put_here)) {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                } else {
 
                    assert(false);
 
                }
 
            }
 
        }"
 
    );
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)