diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 5cd7bd6061b890d86093fffdebfb7e5975a95a9f..4f7c23baab3a6d06e4d7f391d0574ab5b733da77 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -75,6 +75,7 @@ struct ConnectorRunContext<'a> { received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, + prepared_fork: Option, } impl<'a> RunContext for ConnectorRunContext<'a>{ @@ -101,6 +102,10 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ fn get_channel(&mut self) -> Option<(Value, Value)> { return self.prepared_channel.take(); } + + fn get_fork(&mut self) -> Option { + return self.prepared_fork.take(); + } } impl Connector for ConnectorPDL { @@ -210,6 +215,7 @@ impl ConnectorPDL { received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), + prepared_fork: branch.prepared_fork.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -293,6 +299,21 @@ impl ConnectorPDL { branch.sync_state = SpeculativeState::Inconsistent; } }, + RunResult::BranchFork => { + // Like the `NewChannel` result. This means we're setting up + // a branch and putting a marker inside the RunContext for the + // next time we run the PDL code + let left_id = branch_id; + let right_id = self.tree.fork_branch(left_id); + self.consensus.notify_of_new_branch(left_id, right_id); + self.tree.push_into_queue(QueueKind::Runnable, left_id); + self.tree.push_into_queue(QueueKind::Runnable, right_id); + + let left_branch = &mut self.tree[left_id]; + left_branch.prepared_fork = Some(true); + let right_branch = &mut self.tree[right_id]; + right_branch.prepared_fork = Some(false); + } RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); @@ -335,6 +356,7 @@ impl ConnectorPDL { received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), + prepared_fork: branch.prepared_fork.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);