diff --git a/src/runtime/branch.rs b/src/runtime/branch.rs new file mode 100644 index 0000000000000000000000000000000000000000..8d35090031d08c0ba205df2726d51cf4d9ad5be0 --- /dev/null +++ b/src/runtime/branch.rs @@ -0,0 +1,555 @@ +use std::collections::HashMap; +use std::ops::{Index, IndexMut}; + +use crate::protocol::eval::{Prompt, Value, ValueGroup}; + +use super::port::PortIdLocal; + +// To share some logic between the FakeTree and ExecTree implementation +trait BranchListItem { + fn get_id(&self) -> BranchId; + fn set_next_id(&mut self, id: BranchId); + fn get_next_id(&self) -> BranchId; +} + +/// Generic branch ID. A component will always have one branch: the +/// non-speculative branch. This branch has ID 0. Hence in a speculative context +/// we use this fact to let branch ID 0 denote the ID being invalid. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BranchId { + pub index: u32 +} + +impl BranchId { + #[inline] + pub(crate) fn new_invalid() -> Self { + return Self{ index: 0 }; + } + + #[inline] + fn new(index: u32) -> Self { + debug_assert!(index != 0); + return Self{ index }; + } + + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.index != 0; + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum SpeculativeState { + // Non-synchronous variants + RunningNonSync, // regular execution of code + Error, // encountered a runtime error + Finished, // finished executing connector's code + // Synchronous variants + RunningInSync, // running within a sync block + HaltedAtBranchPoint, // at a branching point (at a `get` call) + ReachedSyncEnd, // reached end of sync block, branch represents a local solution + Inconsistent, // branch can never represent a local solution, so halted +} + +#[derive(Debug)] +pub(crate) enum PreparedStatement { + CreatedChannel((Value, Value)), + ForkedExecution(bool), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl PreparedStatement { + pub(crate) fn is_none(&self) -> bool { + if let PreparedStatement::None = self { + return true; + } else { + return false; + } + } + + pub(crate) fn take(&mut self) -> PreparedStatement { + if let PreparedStatement::None = self { + return PreparedStatement::None; + } else { + let mut replacement = PreparedStatement::None; + std::mem::swap(self, &mut replacement); + return replacement; + } + } +} + +/// The execution state of a branch. This envelops the PDL code and the +/// execution state. And derived from that: if we're ready to keep running the +/// code, or if we're halted for some reason (e.g. waiting for a message). +pub(crate) struct Branch { + pub id: BranchId, + pub parent_id: BranchId, + // Execution state + pub code_state: Prompt, + pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. + pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` + pub prepared: PreparedStatement, +} + +impl BranchListItem for Branch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + +impl Branch { + /// Creates a new non-speculative branch + pub(crate) fn new_non_sync(component_state: Prompt) -> Self { + Branch { + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + code_state: component_state, + sync_state: SpeculativeState::RunningNonSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + prepared: PreparedStatement::None, + } + } + + /// Constructs a sync branch. The provided branch is assumed to be the + /// parent of the new branch within the execution tree. + fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { + // debug_assert!( + // (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || + // (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) + // ); // forking from non-sync, or forking from a branching point + debug_assert!(parent_branch.prepared.is_none()); + + Branch { + id: BranchId::new(new_index), + parent_id: parent_branch.id, + code_state: parent_branch.code_state.clone(), + sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, + next_in_queue: BranchId::new_invalid(), + prepared: PreparedStatement::None, + } + } +} + +/// Queue of branches. Just a little helper. +#[derive(Copy, Clone)] +struct BranchQueue { + first: BranchId, + last: BranchId, +} + +impl BranchQueue { + #[inline] + fn new() -> Self { + Self{ + first: BranchId::new_invalid(), + last: BranchId::new_invalid() + } + } + + #[inline] + fn is_empty(&self) -> bool { + debug_assert!(self.first.is_valid() == self.last.is_valid()); + return !self.first.is_valid(); + } +} + +const NUM_QUEUES: usize = 3; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub(crate) enum QueueKind { + Runnable, + AwaitingMessage, + FinishedSync, +} + +impl QueueKind { + fn as_index(&self) -> usize { + return match self { + QueueKind::Runnable => 0, + QueueKind::AwaitingMessage => 1, + QueueKind::FinishedSync => 2, + } + } +} + +// ----------------------------------------------------------------------------- +// ExecTree +// ----------------------------------------------------------------------------- + +/// Execution tree of branches. Tries to keep the extra information stored +/// herein to a minimum. So the execution tree is aware of the branches, their +/// execution state and the way they're dependent on each other, but the +/// execution tree should not be aware of e.g. sync algorithms. +/// +/// Note that the tree keeps track of multiple lists of branches. Each list +/// contains branches that ended up in a particular execution state. The lists +/// are described by the various `BranchQueue` instances and the `next_in_queue` +/// field in each branch. +pub(crate) struct ExecTree { + // All branches. the `parent_id` field in each branch implies the shape of + // the tree. Branches are index stable throughout a sync round. + pub branches: Vec, + queues: [BranchQueue; NUM_QUEUES] +} + +impl ExecTree { + /// Constructs a new execution tree with a single non-sync branch. + pub fn new(component: Prompt) -> Self { + return Self { + branches: vec![Branch::new_non_sync(component)], + queues: [BranchQueue::new(); 3] + } + } + + // --- Generic branch (queue) management + + /// Returns if tree is in speculative mode + pub fn is_in_sync(&self) -> bool { + return self.branches.len() != 1; + } + + /// Returns true if the particular queue is empty + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + + /// Pops a branch (ID) from a queue. + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); + } + + /// Pushes a branch (ID) into a queue. + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); + } + + /// Returns the non-sync branch (TODO: better name?) + pub fn base_branch_mut(&mut self) -> &mut Branch { + debug_assert!(!self.is_in_sync()); + return &mut self.branches[0]; + } + + /// Returns the branch ID of the first branch in a particular queue. + pub fn get_queue_first(&self, kind: QueueKind) -> Option { + let queue = &self.queues[kind.as_index()]; + if queue.first.is_valid() { + return Some(queue.first); + } else { + return None; + } + } + + /// Returns the next branch ID of a branch (assumed to be in a particular + /// queue. + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; + } + } + + /// Returns an iterator that starts with the provided branch, and then + /// continues to visit all of the branch's parents. + pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter { + return BranchParentIter{ + branches: self.branches.as_slice(), + index: branch_id.index as usize, + } + } + + // --- Preparing and finishing a speculative round + + /// Starts a synchronous round by cloning the non-sync branch and marking it + /// as the root of the speculative tree. The id of this root sync branch is + /// returned. + pub fn start_sync(&mut self) -> BranchId { + debug_assert!(!self.is_in_sync()); + let sync_branch = Branch::new_sync(1, &self.branches[0]); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + + return sync_branch_id; + } + + /// Creates a new speculative branch based on the provided one. The index to + /// retrieve this new branch will be returned. + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch); + let new_branch_id = new_branch.id; + self.branches.push(new_branch); + + return new_branch_id; + } + + /// Collapses the speculative execution tree back into a deterministic one, + /// using the provided branch as the final sync result. + pub fn end_sync(&mut self, branch_id: BranchId) { + debug_assert!(self.is_in_sync()); + + // Swap indicated branch into the first position + self.branches.swap(0, branch_id.index as usize); + self.branches.truncate(1); + + // Reset all values to non-sync defaults + let branch = &mut self.branches[0]; + branch.id = BranchId::new_invalid(); + branch.parent_id = BranchId::new_invalid(); + branch.sync_state = SpeculativeState::RunningNonSync; + debug_assert!(!branch.awaiting_port.is_valid()); + branch.next_in_queue = BranchId::new_invalid(); + debug_assert!(branch.prepared.is_none()); + + // Clear out all the queues + for queue_idx in 0..NUM_QUEUES { + self.queues[queue_idx] = BranchQueue::new(); + } + } +} + +impl Index for ExecTree { + type Output = Branch; + + fn index(&self, index: BranchId) -> &Self::Output { + debug_assert!(index.is_valid()); + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for ExecTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + debug_assert!(index.is_valid()); + return &mut self.branches[index.index as usize]; + } +} + +/// Iterator over the parents of an `ExecTree` branch. +pub(crate) struct BranchParentIter<'a> { + branches: &'a [Branch], + index: usize, +} + +impl<'a> Iterator for BranchParentIter<'a> { + type Item = &'a Branch; + + fn next(&mut self) -> Option { + if self.index == 0 { + return None; + } + + let branch = &self.branches[self.index]; + self.index = branch.parent_id.index as usize; + return Some(branch); + } +} + +// ----------------------------------------------------------------------------- +// FakeTree +// ----------------------------------------------------------------------------- + +/// Generic fake branch. This is supposed to be used in conjunction with the +/// fake tree. The purpose is to have a branching-like tree to use in +/// combination with a consensus algorithm in places where we don't have PDL +/// code. +pub(crate) struct FakeBranch { + pub id: BranchId, + pub parent_id: BranchId, + pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, + pub next_in_queue: BranchId, + pub inbox: HashMap, +} + +impl BranchListItem for FakeBranch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + +impl FakeBranch { + fn new_root(_index: u32) -> FakeBranch { + debug_assert!(_index == 1); + return FakeBranch{ + id: BranchId::new(1), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningInSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + } + } + + fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch { + return FakeBranch { + id: BranchId::new(index), + parent_id: parent_branch.id, + sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, + next_in_queue: BranchId::new_invalid(), + inbox: parent_branch.inbox.clone(), + } + } + + pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { + debug_assert!(target_port.is_valid()); + debug_assert!(self.awaiting_port == target_port); + self.awaiting_port = PortIdLocal::new_invalid(); + self.inbox.insert(target_port, contents); + } +} + +/// A little helper for native components that don't have a set of branches that +/// are actually executing code, but just have to manage the idea of branches +/// due to them performing the equivalent of a branching `get` call. +pub(crate) struct FakeTree { + pub branches: Vec, + queues: [BranchQueue; NUM_QUEUES], +} + +impl FakeTree { + pub fn new() -> Self { + // TODO: Don't like this? Cause is that now we don't have a non-sync + // branch. But we assumed BranchId=0 means the branch is invalid. We + // can do the rusty Option stuff. But we still need a token + // value within the protocol to signify no-branch-id. Maybe the high + // bit? Branches are crazy expensive, no-one is going to have 2^32 + // branches anyway. 2^31 isn't too bad. + return Self { + branches: vec![FakeBranch{ + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningNonSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + }], + queues: [BranchQueue::new(); 3] + } + } + + fn is_in_sync(&self) -> bool { + return self.branches.len() > 1; + } + + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); + } + + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); + } + + pub fn get_queue_first(&self, kind: QueueKind) -> Option { + let queue = &self.queues[kind.as_index()]; + if queue.first.is_valid() { + return Some(queue.first) + } else { + return None; + } + } + + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; + } + } + + pub fn start_sync(&mut self) -> BranchId { + debug_assert!(!self.is_in_sync()); + + // Create the first branch + let sync_branch = FakeBranch::new_root(1); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + + return sync_branch_id; + } + + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch); + let new_branch_id = new_branch.id; + self.branches.push(new_branch); + + return new_branch_id; + } + + pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { + debug_assert!(branch_id.is_valid()); + debug_assert!(self.is_in_sync()); + + // Take out the succeeding branch, then just clear all fake branches. + self.branches.swap(1, branch_id.index as usize); + self.branches.truncate(2); + let result = self.branches.pop().unwrap(); + + for queue_index in 0..NUM_QUEUES { + self.queues[queue_index] = BranchQueue::new(); + } + + return result; + } +} + +impl Index for FakeTree { + type Output = FakeBranch; + + fn index(&self, index: BranchId) -> &Self::Output { + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for FakeTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + return &mut self.branches[index.index as usize]; + } +} + +// ----------------------------------------------------------------------------- +// Shared logic +// ----------------------------------------------------------------------------- + +fn pop_from_queue(queue: &mut BranchQueue, branches: &mut [B]) -> Option { + if queue.is_empty() { + return None; + } else { + let first_branch = &mut branches[queue.first.index as usize]; + queue.first = first_branch.get_next_id(); + first_branch.set_next_id(BranchId::new_invalid()); + if !queue.first.is_valid() { + queue.last = BranchId::new_invalid(); + } + + return Some(first_branch.get_id()); + } +} + +fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) { + debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid()); + if queue.is_empty() { + queue.first = branch_id; + queue.last = branch_id; + } else { + let last_branch = &mut branches[queue.last.index as usize]; + last_branch.set_next_id(branch_id); + queue.last = branch_id; + } +} \ No newline at end of file