Changeset - a2bfd792a202
[Not reviewed]
0 5 1
MH - 4 years ago 2021-11-13 12:36:41
contact@maxhenger.nl
Implement and test explicit forking in runtime
6 files changed with 154 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/protocol/ast_printer.rs
Show inline comments
 
@@ -33,12 +33,14 @@ const PREFIX_ENDIF_STMT_ID: &'static str = "SEIf";
 
const PREFIX_WHILE_STMT_ID: &'static str = "SWhi";
 
const PREFIX_ENDWHILE_STMT_ID: &'static str = "SEWh";
 
const PREFIX_BREAK_STMT_ID: &'static str = "SBre";
 
const PREFIX_CONTINUE_STMT_ID: &'static str = "SCon";
 
const PREFIX_SYNC_STMT_ID: &'static str = "SSyn";
 
const PREFIX_ENDSYNC_STMT_ID: &'static str = "SESy";
 
const PREFIX_FORK_STMT_ID: &'static str = "SFrk";
 
const PREFIX_END_FORK_STMT_ID: &'static str = "SEFk";
 
const PREFIX_RETURN_STMT_ID: &'static str = "SRet";
 
const PREFIX_ASSERT_STMT_ID: &'static str = "SAsr";
 
const PREFIX_GOTO_STMT_ID: &'static str = "SGot";
 
const PREFIX_NEW_STMT_ID: &'static str = "SNew";
 
const PREFIX_PUT_STMT_ID: &'static str = "SPut";
 
const PREFIX_EXPR_STMT_ID: &'static str = "SExp";
 
@@ -508,12 +510,30 @@ impl ASTWriter {
 
            Statement::EndSynchronous(stmt) => {
 
                self.kv(indent).with_id(PREFIX_ENDSYNC_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndSynchronous");
 
                self.kv(indent2).with_s_key("StartSync").with_disp_val(&stmt.start_sync.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            },
 
            Statement::Fork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Fork");
 
                self.kv(indent2).with_s_key("EndFork").with_disp_val(&stmt.end_fork.0.index);
 
                self.kv(indent2).with_s_key("LeftBody");
 
                self.write_stmt(heap, stmt.left_body.upcast(), indent3);
 

	
 
                if let Some(right_body_id) = stmt.right_body {
 
                    self.kv(indent2).with_s_key("RightBody");
 
                    self.write_stmt(heap, right_body_id.upcast(), indent3);
 
                }
 
            },
 
            Statement::EndFork(stmt) => {
 
                self.kv(indent).with_id(PREFIX_END_FORK_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("EndFork");
 
                self.kv(indent2).with_s_key("StartFork").with_disp_val(&stmt.start_fork.0.index);
 
                self.kv(indent2).with_s_key("Next").with_disp_val(&stmt.next.index);
 
            }
 
            Statement::Return(stmt) => {
 
                self.kv(indent).with_id(PREFIX_RETURN_STMT_ID, stmt.this.0.index)
 
                    .with_s_key("Return");
 
                self.kv(indent2).with_s_key("Expressions");
 
                for expr_id in &stmt.expressions {
 
                    self.write_expr(heap, *expr_id, indent3);
src/runtime2/branch.rs
Show inline comments
 
@@ -62,12 +62,13 @@ pub(crate) struct Branch {
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
 
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
 
    pub prepared_fork: Option<bool>, // TODO: See above
 
}
 

	
 
impl BranchListItem for Branch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
@@ -82,33 +83,36 @@ impl Branch {
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
            prepared_channel: None,
 
            prepared_fork: None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        ); // forking from non-sync, or forking from a branching point
 
        // debug_assert!(
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 
        debug_assert!(parent_branch.prepared_fork.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
            prepared_fork: None,
 
        }
 
    }
 

	
 
    /// Inserts a message into the branch for retrieval by a corresponding
 
    /// `get(port)` call.
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
src/runtime2/connector.rs
Show inline comments
 
@@ -72,12 +72,13 @@ pub(crate) struct ConnectorPDL {
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
    prepared_fork: Option<bool>,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
@@ -98,12 +99,16 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    }
 

	
 
    fn get_fork(&mut self) -> Option<bool> {
 
        return self.prepared_fork.take();
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
@@ -207,12 +212,13 @@ impl ConnectorPDL {
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared_fork: branch.prepared_fork.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
@@ -290,12 +296,27 @@ impl ConnectorPDL {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared_fork = Some(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared_fork = Some(false);
 
            }
 
            RunResult::BranchPut(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                if consistency == Consistency::Valid {
 
                    // `put()` is valid.
 
@@ -332,12 +353,13 @@ impl ConnectorPDL {
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            prepared_fork: branch.prepared_fork.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
src/runtime2/tests/api_component.rs
Show inline comments
 
@@ -118,7 +118,41 @@ fn test_putting_to_component() {
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::UInt32(42 + loop_idx)])),
 
        ]).expect("start sync round");
 

	
 
        // Note: if we finish a round, then it must have succeeded :)
 
        api.wait().expect("finish sync round");
 
    }
 
}
 

	
 
#[test]
 
fn test_doing_nothing() {
 
    const CODE: &'static str = "
 
    primitive getter(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {}
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(channel.getter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for _ in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![]).expect("start silent sync round");
 
        api.wait().expect("finish silent sync round");
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(channel.putter_id, ValueGroup::new_stack(vec![Value::Bool(true)]))
 
        ]).expect("start firing sync round");
 
        let res = api.wait().expect("finish firing sync round");
 
        assert!(res.is_empty());
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation_basic;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
src/runtime2/tests/speculation_basic.rs
Show inline comments
 
new file 100644
 
// Testing speculation - Basic forms
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_maybe_do_nothing() {
 
    // Three variants in which the behaviour in which nothing is performed is
 
    // somehow not allowed. Note that we "check" by seeing if the test finishes.
 
    // Only the branches in which ports fire increment the loop index
 
    const CODE: &'static str = "
 
    primitive only_puts(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { put(output, true); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_put(out<bool> output, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                fork { put(output, true); index += 1; }
 
                or   {}
 
            }
 
        }
 
    }
 

	
 
    primitive only_gets(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync { auto res = get(input); assert(res); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive might_get(in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync fork { auto res = get(input); assert(res); index += 1; } or {}
 
        }
 
    }
 
    ";
 

	
 
    // Construct all variants which should work and wait until the runtime exits
 
    run_test_in_runtime(CODE, |api| {
 
        // only putting -> maybe getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "only_puts", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
        api.create_connector("", "might_get", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 

	
 
        // maybe putting -> only getting
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "might_put", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
        api.create_connector("", "only_gets", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
    })
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)