use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context /// we use this fact to let branch ID 0 denote the ID being invalid. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BranchId { pub index: u32 } impl BranchId { #[inline] fn new_invalid() -> Self { return Self{ index: 0 }; } #[inline] fn new(index: u32) -> Self { debug_assert!(index != 0); return Self{ index }; } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.index != 0; } } #[derive(Debug, PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code Error, // encountered a runtime error Finished, // finished executing connector's code // Synchronous variants RunningInSync, // running within a sync block HaltedAtBranchPoint, // at a branching point (at a `get` call) ReachedSyncEnd, // reached end of sync block, branch represents a local solution Inconsistent, // branch can never represent a local solution, so halted } /// The execution state of a branch. This envelops the PDL code and the /// execution state. And derived from that: if we're ready to keep running the /// code, or if we're halted for some reason (e.g. waiting for a message). pub(crate) struct Branch { pub id: BranchId, pub parent_id: BranchId, // Execution state pub code_state: ComponentState, pub sync_state: SpeculativeState, pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` } impl Branch { /// Creates a new non-speculative branch pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { Branch { id: BranchId::new_invalid(), parent_id: BranchId::new_invalid(), code_state: component_state, sync_state: SpeculativeState::RunningNonSync, next_in_queue: BranchId::new_invalid(), } } /// Constructs a sync branch. The provided branch is assumed to be the /// parent of the new branch within the execution tree. fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { debug_assert!( (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); debug_assert!(parent_branch.prepared_channel.is_none()); Branch { id: BranchId::new(new_index), parent_id: parent_branch.index, code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, next_in_queue: BranchId::new_invalid(), } } } /// Queue of branches. Just a little helper. #[derive(Copy, Clone)] struct BranchQueue { first: BranchId, last: BranchId, } impl BranchQueue { #[inline] fn new() -> Self { Self{ first: BranchId::new_invalid(), last: BranchId::new_invalid() } } #[inline] fn is_empty(&self) -> bool { debug_assert!(self.first.is_valid() == self.last.is_valid()); return !self.first.is_valid(); } } const NUM_QUEUES: usize = 3; pub(crate) enum QueueKind { Runnable, AwaitingMessage, FinishedSync, } impl QueueKind { fn as_index(&self) -> usize { return match self { QueueKind::Runnable => 0, QueueKind::AwaitingMessage => 1, QueueKind::FinishedSync => 2, } } } /// Execution tree of branches. Tries to keep the extra information stored /// herein to a minimum. So the execution tree is aware of the branches, their /// execution state and the way they're dependent on each other, but the /// execution tree should not be aware of e.g. sync algorithms. /// /// Note that the tree keeps track of multiple lists of branches. Each list /// contains branches that ended up in a particular execution state. The lists /// are described by the various `BranchQueue` instances and the `next_in_queue` /// field in each branch. pub(crate) struct ExecTree { // All branches. the `parent_id` field in each branch implies the shape of // the tree. Branches are index stable throughout a sync round. pub branches: Vec, pub queues: [BranchQueue; NUM_QUEUES] } impl ExecTree { /// Constructs a new execution tree with a single non-sync branch. pub fn new(component: ComponentState) -> Self { return Self { branches: vec![Branch::new_non_sync(component)], queues: [BranchQueue::new(); 3] } } // --- Generic branch (queue) management /// Returns if tree is in speculative mode pub fn is_in_sync(&self) -> bool { return self.branches.len() != 1; } /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { let queue = &mut self.queues[kind.as_index()]; if queue.is_empty() { return None; } else { let first_branch = &mut self.branches[queue.first.index as usize]; queue.first = first_branch.next_in_queue; first_branch.next_in_queue = BranchId::new_invalid(); if !queue.first.is_valid() { queue.last = BranchId::new_invalid(); } return Some(first_branch.id); } } /// Pushes a branch (ID) into a queue. pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { let queue = &mut self.queues[kind.as_index()]; if queue.is_empty() { queue.first = id; queue.last = id; } else { let last_branch = &mut self.branches[queue.last.index as usize]; last_branch.next_in_queue = id; queue.last = id; } } pub fn iter_queue(&self, kind: QueueKind) -> BranchIter { let queue = &self.queues[kind.as_index()]; let index = queue.first as usize; return BranchIter{ branches: self.branches.as_slice(), index, } } // --- Preparing and finishing a speculative round /// Starts a synchronous round by cloning the non-sync branch and marking it /// as the root of the speculative tree. pub fn start_sync(&mut self) { debug_assert!(!self.is_in_sync()); let sync_branch = Branch::new_sync(1, &self.branches[0]); let sync_branch_id = sync_branch.id; self.branches.push(sync_branch); self.push_into_queue(QueueKind::Runnable, sync_branch_id); } /// Creates a new speculative branch based on the provided one. The index to /// retrieve this new branch will be returned. pub fn fork_branch(&mut self, parent_branch_id: BranchId, initial_queue: Option) -> BranchId { debug_assert!(self.is_in_sync()); let parent_branch = &self[parent_branch_id]; let new_branch = Branch::new_sync(1, parent_branch); let new_branch_id = new_branch.id; if let Some(kind) = initial_queue { self.push_into_queue(kind, new_branch_id); } return new_branch_id; } /// Collapses the speculative execution tree back into a deterministic one, /// using the provided branch as the final sync result. pub fn end_sync(&mut self, branch_id: BranchId) { debug_assert!(self.is_in_sync()); debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id)); // Swap indicated branch into the first position self.branches.swap(0, branch_id.index as usize); self.branches.truncate(1); // Reset all values to non-sync defaults let branch = &mut self.branches[0]; branch.id = BranchId::new_invalid(); branch.parent_id = BranchId::new_invalid(); branch.sync_state = SpeculativeState::RunningNonSync; branch.next_in_queue = BranchId::new_invalid(); // Clear out all the queues for queue_idx in 0..NUM_QUEUES { self.queues[queue_idx] = BranchQueue::new(); } } } impl Index for ExecTree { type Output = Branch; fn index(&self, index: BranchId) -> &Self::Output { debug_assert!(index.is_valid()); return &self.branches[index.index as usize]; } } impl IndexMut for ExecTree { fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { debug_assert!(index.is_valid()); return &mut self.branches[index.index as usize]; } } pub struct BranchIter<'a> { branches: &'a [Branch], index: usize, } impl<'a> Iterator for BranchIter<'a> { type Item = &'a Branch; fn next(&mut self) -> Option { if self.index == 0 { // i.e. the invalid branch index return None; } let branch = &self.branches[self.index]; self.index = branch.next_in_queue.index as usize; return Some(branch); } }