use std::collections::HashMap; use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; use crate::protocol::eval::{Value, ValueGroup}; use super::port::PortIdLocal; // To share some logic between the FakeTree and ExecTree implementation trait BranchListItem { fn get_id(&self) -> BranchId; fn set_next_id(&mut self, id: BranchId); fn get_next_id(&self) -> BranchId; } /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context /// we use this fact to let branch ID 0 denote the ID being invalid. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BranchId { pub index: u32 } impl BranchId { #[inline] pub(crate) fn new_invalid() -> Self { return Self{ index: 0 }; } #[inline] fn new(index: u32) -> Self { debug_assert!(index != 0); return Self{ index }; } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.index != 0; } } #[derive(Debug, PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code Error, // encountered a runtime error Finished, // finished executing connector's code // Synchronous variants RunningInSync, // running within a sync block HaltedAtBranchPoint, // at a branching point (at a `get` call) ReachedSyncEnd, // reached end of sync block, branch represents a local solution Inconsistent, // branch can never represent a local solution, so halted } /// The execution state of a branch. This envelops the PDL code and the /// execution state. And derived from that: if we're ready to keep running the /// code, or if we're halted for some reason (e.g. waiting for a message). pub(crate) struct Branch { pub id: BranchId, pub parent_id: BranchId, // Execution state pub code_state: ComponentState, pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` pub inbox: HashMap, // TODO: Remove, currently only valid in single-get/put mode pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? } impl BranchListItem for Branch { #[inline] fn get_id(&self) -> BranchId { return self.id; } #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } } impl Branch { /// Creates a new non-speculative branch pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { Branch { id: BranchId::new_invalid(), parent_id: BranchId::new_invalid(), code_state: component_state, sync_state: SpeculativeState::RunningNonSync, awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), inbox: HashMap::new(), prepared_channel: None, } } /// Constructs a sync branch. The provided branch is assumed to be the /// parent of the new branch within the execution tree. fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { debug_assert!( (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); // forking from non-sync, or forking from a branching point debug_assert!(parent_branch.prepared_channel.is_none()); Branch { id: BranchId::new(new_index), parent_id: parent_branch.id, code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), inbox: parent_branch.inbox.clone(), prepared_channel: None, } } /// Inserts a message into the branch for retrieval by a corresponding /// `get(port)` call. pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { debug_assert!(target_port.is_valid()); debug_assert!(self.awaiting_port == target_port); self.awaiting_port = PortIdLocal::new_invalid(); self.inbox.insert(target_port, contents); } } /// Queue of branches. Just a little helper. #[derive(Copy, Clone)] struct BranchQueue { first: BranchId, last: BranchId, } impl BranchQueue { #[inline] fn new() -> Self { Self{ first: BranchId::new_invalid(), last: BranchId::new_invalid() } } #[inline] fn is_empty(&self) -> bool { debug_assert!(self.first.is_valid() == self.last.is_valid()); return !self.first.is_valid(); } } const NUM_QUEUES: usize = 3; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum QueueKind { Runnable, AwaitingMessage, FinishedSync, } impl QueueKind { fn as_index(&self) -> usize { return match self { QueueKind::Runnable => 0, QueueKind::AwaitingMessage => 1, QueueKind::FinishedSync => 2, } } } // ----------------------------------------------------------------------------- // ExecTree // ----------------------------------------------------------------------------- /// Execution tree of branches. Tries to keep the extra information stored /// herein to a minimum. So the execution tree is aware of the branches, their /// execution state and the way they're dependent on each other, but the /// execution tree should not be aware of e.g. sync algorithms. /// /// Note that the tree keeps track of multiple lists of branches. Each list /// contains branches that ended up in a particular execution state. The lists /// are described by the various `BranchQueue` instances and the `next_in_queue` /// field in each branch. pub(crate) struct ExecTree { // All branches. the `parent_id` field in each branch implies the shape of // the tree. Branches are index stable throughout a sync round. pub branches: Vec, queues: [BranchQueue; NUM_QUEUES] } impl ExecTree { /// Constructs a new execution tree with a single non-sync branch. pub fn new(component: ComponentState) -> Self { return Self { branches: vec![Branch::new_non_sync(component)], queues: [BranchQueue::new(); 3] } } // --- Generic branch (queue) management /// Returns if tree is in speculative mode pub fn is_in_sync(&self) -> bool { return self.branches.len() != 1; } /// Returns true if the particular queue is empty pub fn queue_is_empty(&self, kind: QueueKind) -> bool { return self.queues[kind.as_index()].is_empty(); } /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); } /// Pushes a branch (ID) into a queue. pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } /// Returns the non-sync branch (TODO: better name?) pub fn base_branch_mut(&mut self) -> &mut Branch { debug_assert!(!self.is_in_sync()); return &mut self.branches[0]; } /// Returns the branch ID of the first branch in a particular queue. pub fn get_queue_first(&self, kind: QueueKind) -> Option { let queue = &self.queues[kind.as_index()]; if queue.first.is_valid() { return Some(queue.first); } else { return None; } } /// Returns the next branch ID of a branch (assumed to be in a particular /// queue. pub fn get_queue_next(&self, branch_id: BranchId) -> Option { let branch = &self.branches[branch_id.index as usize]; if branch.next_in_queue.is_valid() { return Some(branch.next_in_queue); } else { return None; } } /// Returns an iterator that starts with the provided branch, and then /// continues to visit all of the branch's parents. pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter { return BranchParentIter{ branches: self.branches.as_slice(), index: branch_id.index as usize, } } // --- Preparing and finishing a speculative round /// Starts a synchronous round by cloning the non-sync branch and marking it /// as the root of the speculative tree. The id of this root sync branch is /// returned. pub fn start_sync(&mut self) -> BranchId { debug_assert!(!self.is_in_sync()); let sync_branch = Branch::new_sync(1, &self.branches[0]); let sync_branch_id = sync_branch.id; self.branches.push(sync_branch); return sync_branch_id; } /// Creates a new speculative branch based on the provided one. The index to /// retrieve this new branch will be returned. pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { debug_assert!(self.is_in_sync()); let parent_branch = &self[parent_branch_id]; let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch); let new_branch_id = new_branch.id; self.branches.push(new_branch); return new_branch_id; } /// Collapses the speculative execution tree back into a deterministic one, /// using the provided branch as the final sync result. pub fn end_sync(&mut self, branch_id: BranchId) { debug_assert!(self.is_in_sync()); // Swap indicated branch into the first position self.branches.swap(0, branch_id.index as usize); self.branches.truncate(1); // Reset all values to non-sync defaults let branch = &mut self.branches[0]; branch.id = BranchId::new_invalid(); branch.parent_id = BranchId::new_invalid(); branch.sync_state = SpeculativeState::RunningNonSync; debug_assert!(!branch.awaiting_port.is_valid()); branch.next_in_queue = BranchId::new_invalid(); branch.inbox.clear(); debug_assert!(branch.prepared_channel.is_none()); // Clear out all the queues for queue_idx in 0..NUM_QUEUES { self.queues[queue_idx] = BranchQueue::new(); } } } impl Index for ExecTree { type Output = Branch; fn index(&self, index: BranchId) -> &Self::Output { debug_assert!(index.is_valid()); return &self.branches[index.index as usize]; } } impl IndexMut for ExecTree { fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { debug_assert!(index.is_valid()); return &mut self.branches[index.index as usize]; } } /// Iterator over the parents of an `ExecTree` branch. pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], index: usize, } impl<'a> Iterator for BranchParentIter<'a> { type Item = &'a Branch; fn next(&mut self) -> Option { if self.index == 0 { return None; } let branch = &self.branches[self.index]; self.index = branch.parent_id.index as usize; return Some(branch); } } // ----------------------------------------------------------------------------- // FakeTree // ----------------------------------------------------------------------------- /// Generic fake branch. This is supposed to be used in conjunction with the /// fake tree. The purpose is to have a branching-like tree to use in /// combination with a consensus algorithm in places where we don't have PDL /// code. pub(crate) struct FakeBranch { pub id: BranchId, pub parent_id: BranchId, pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, pub next_in_queue: BranchId, pub inbox: HashMap, } impl BranchListItem for FakeBranch { #[inline] fn get_id(&self) -> BranchId { return self.id; } #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } } impl FakeBranch { fn new_root(_index: u32) -> FakeBranch { debug_assert!(_index == 1); return FakeBranch{ id: BranchId::new(1), parent_id: BranchId::new_invalid(), sync_state: SpeculativeState::RunningInSync, awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), inbox: HashMap::new(), } } fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch { return FakeBranch { id: BranchId::new(index), parent_id: parent_branch.id, sync_state: SpeculativeState::RunningInSync, awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), inbox: parent_branch.inbox.clone(), } } pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { debug_assert!(target_port.is_valid()); debug_assert!(self.awaiting_port == target_port); self.awaiting_port = PortIdLocal::new_invalid(); self.inbox.insert(target_port, contents); } } /// A little helper for native components that don't have a set of branches that /// are actually executing code, but just have to manage the idea of branches /// due to them performing the equivalent of a branching `get` call. pub(crate) struct FakeTree { pub branches: Vec, queues: [BranchQueue; NUM_QUEUES], } impl FakeTree { pub fn new() -> Self { // TODO: Don't like this? Cause is that now we don't have a non-sync // branch. But we assumed BranchId=0 means the branch is invalid. We // can do the rusty Option stuff. But we still need a token // value within the protocol to signify no-branch-id. Maybe the high // bit? Branches are crazy expensive, no-one is going to have 2^32 // branches anyway. 2^31 isn't too bad. return Self { branches: vec![FakeBranch{ id: BranchId::new_invalid(), parent_id: BranchId::new_invalid(), sync_state: SpeculativeState::RunningNonSync, awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), inbox: HashMap::new(), }], queues: [BranchQueue::new(); 3] } } fn is_in_sync(&self) -> bool { return !self.branches.is_empty(); } pub fn queue_is_empty(&self, kind: QueueKind) -> bool { return self.queues[kind.as_index()].is_empty(); } pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { debug_assert_ne!(kind, QueueKind::FinishedSync); return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); } pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } pub fn get_queue_first(&self, kind: QueueKind) -> Option { let queue = &self.queues[kind.as_index()]; if queue.first.is_valid() { return Some(queue.first) } else { return None; } } pub fn get_queue_next(&self, branch_id: BranchId) -> Option { let branch = &self.branches[branch_id.index as usize]; if branch.next_in_queue.is_valid() { return Some(branch.next_in_queue); } else { return None; } } pub fn start_sync(&mut self) -> BranchId { debug_assert!(!self.is_in_sync()); // Create the first branch let sync_branch = FakeBranch::new_root(0); let sync_branch_id = sync_branch.id; self.branches.push(sync_branch); return sync_branch_id; } pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { debug_assert!(self.is_in_sync()); let parent_branch = &self[parent_branch_id]; let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch); let new_branch_id = new_branch.id; self.branches.push(new_branch); return new_branch_id; } pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { debug_assert!(branch_id.is_valid()); debug_assert!(self.is_in_sync()); // Take out the succeeding branch, then just clear all fake branches. self.branches.swap(1, branch_id.index as usize); self.branches.truncate(2); let result = self.branches.pop().unwrap(); for queue_index in 0..NUM_QUEUES { self.queues[queue_index] = BranchQueue::new(); } return result; } } impl Index for FakeTree { type Output = FakeBranch; fn index(&self, index: BranchId) -> &Self::Output { return &self.branches[index.index as usize]; } } impl IndexMut for FakeTree { fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { return &mut self.branches[index.index as usize]; } } // ----------------------------------------------------------------------------- // Shared logic // ----------------------------------------------------------------------------- fn pop_from_queue(queue: &mut BranchQueue, branches: &mut [B]) -> Option { if queue.is_empty() { return None; } else { let first_branch = &mut branches[queue.first.index as usize]; queue.first = first_branch.get_next_id(); first_branch.set_next_id(BranchId::new_invalid()); if !queue.first.is_valid() { queue.last = BranchId::new_invalid(); } return Some(first_branch.get_id()); } } fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) { debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid()); if queue.is_empty() { queue.first = branch_id; queue.last = branch_id; } else { let last_branch = &mut branches[queue.last.index as usize]; last_branch.set_next_id(branch_id); queue.last = branch_id; } }