diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 1b296960bcfabfc69b0176b84cd9b13b012e6bd3..465d52f779922ffc6ca94a0c13980c9317ef4f95 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -6,6 +6,13 @@ use crate::protocol::eval::{Value, ValueGroup}; use super::port::PortIdLocal; +// To share some logic between the FakeTree and ExecTree implementation +trait BranchListItem { + #[inline] fn get_id(&self) -> BranchId; + #[inline] fn set_next_id(&mut self, id: BranchId); + #[inline] fn get_next_id(&self) -> BranchId; +} + /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context /// we use this fact to let branch ID 0 denote the ID being invalid. @@ -60,6 +67,12 @@ pub(crate) struct Branch { pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? } +impl BranchListItem for Branch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + impl Branch { /// Creates a new non-speculative branch pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { @@ -148,6 +161,10 @@ impl QueueKind { } } +// ----------------------------------------------------------------------------- +// ExecTree +// ----------------------------------------------------------------------------- + /// Execution tree of branches. Tries to keep the extra information stored /// herein to a minimum. So the execution tree is aware of the branches, their /// execution state and the way they're dependent on each other, but the @@ -188,32 +205,12 @@ impl ExecTree { /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - return None; - } else { - let first_branch = &mut self.branches[queue.first.index as usize]; - queue.first = first_branch.next_in_queue; - first_branch.next_in_queue = BranchId::new_invalid(); - if !queue.first.is_valid() { - queue.last = BranchId::new_invalid(); - } - - return Some(first_branch.id); - } + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); } /// Pushes a branch (ID) into a queue. pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - queue.first = id; - queue.last = id; - } else { - let last_branch = &mut self.branches[queue.last.index as usize]; - last_branch.next_in_queue = id; - queue.last = id; - } + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } /// Returns the non-sync branch (TODO: better name?) @@ -225,25 +222,13 @@ impl ExecTree { /// Returns an iterator over all the elements in the queue of the given /// kind. One can start the iteration at the branch *after* the provided /// branch. Just make sure it actually is in the provided queue. - pub fn iter_queue(&self, kind: QueueKind, start_at: Option) -> BranchQueueIter { + pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, Branch> { + // Make sure branch is in correct queue while in debug mode + debug_assert!(start_after + .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) + .unwrap_or(true)); let queue = &self.queues[kind.as_index()]; - - let index = match start_at { - Some(branch_id) => { - debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id)); - let branch = &self.branches[branch_id.index as usize]; - - branch.next_in_queue.index as usize - }, - None => { - queue.first.index as usize - } - }; - - return BranchQueueIter { - branches: self.branches.as_slice(), - index, - } + return iter_queue(queue, &self.branches, start_after); } /// Returns an iterator that starts with the provided branch, and then @@ -324,13 +309,14 @@ impl IndexMut for ExecTree { } } -pub(crate) struct BranchQueueIter<'a> { - branches: &'a [Branch], +/// Iterator over branches in a `ExecTree` queue. +pub(crate) struct BranchQueueIter<'a, B: BranchListItem> { + branches: &'a [B], index: usize, } -impl<'a> Iterator for BranchQueueIter<'a> { - type Item = &'a Branch; +impl<'a, B: BranchListItem> Iterator for BranchQueueIter<'a, B> { + type Item = &'a B; fn next(&mut self) -> Option { if self.index == 0 { @@ -339,11 +325,12 @@ impl<'a> Iterator for BranchQueueIter<'a> { } let branch = &self.branches[self.index]; - self.index = branch.next_in_queue.index as usize; + self.index = branch.get_next_id().index as usize; return Some(branch); } } +/// Iterator over the parents of an `ExecTree` branch. pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], index: usize, @@ -361,4 +348,197 @@ impl<'a> Iterator for BranchParentIter<'a> { self.index = branch.parent_id.index as usize; return Some(branch); } +} + +// ----------------------------------------------------------------------------- +// FakeTree +// ----------------------------------------------------------------------------- + +/// Generic fake branch. This is supposed to be used in conjunction with the +/// fake tree. The purpose is to have a branching-like tree to use in +/// combination with a consensus algorithm in places where we don't have PDL +/// code. +pub(crate) struct FakeBranch { + pub id: BranchId, + pub parent_id: BranchId, + pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, + pub next_in_queue: BranchId, + pub inbox: HashMap, +} + +impl BranchListItem for FakeBranch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + +impl FakeBranch { + fn new_root(index: u32) -> FakeBranch { + debug_assert!(index == 0); + return FakeBranch{ + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningInSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + } + } + + fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch { + return FakeBranch { + id: BranchId::new(index), + parent_id: parent_branch.id, + sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, + next_in_queue: BranchId::new_invalid(), + inbox: parent_branch.inbox.clone(), + } + } + + pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { + debug_assert!(target_port.is_valid()); + debug_assert!(self.awaiting_port == target_port); + self.awaiting_port = PortIdLocal::new_invalid(); + self.inbox.insert(target_port, contents); + } +} + +/// A little helper for native components that don't have a set of branches that +/// are actually executing code, but just have to manage the idea of branches +/// due to them performing the equivalent of a branching `get` call. +pub(crate) struct FakeTree { + pub branches: Vec, + queues: [BranchQueue; NUM_QUEUES], +} + +impl FakeTree { + pub fn new() -> Self { + return Self { + branches: Vec::new(), + queues: [BranchQueue::new(); 3] + } + } + + fn is_in_sync(&self) -> bool { + return !self.branches.is_empty(); + } + + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); + } + + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); + } + + pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, FakeBranch> { + debug_assert!(start_after + .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) + .unwrap_or(true) + ); + return iter_queue(&self.queues[kind.as_index()], &self.branches, start_after); + } + + pub fn start_sync(&mut self) -> BranchId { + debug_assert!(!self.is_in_sync()); + + // Create the first branch + let sync_branch = FakeBranch::new_root(0); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + + return sync_branch_id; + } + + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch); + let new_branch_id = new_branch.id; + self.branches.push(new_branch); + + return new_branch_id; + } + + pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { + debug_assert!(self.is_in_sync()); + debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == BranchId)); + + // Take out the succeeding branch, then just clear all fake branches. + let mut iter = self.branches.drain(branch_id.index..); + let result = iter.next().unwrap(); + + for queue_index in 0..NUM_QUEUES { + self.queues[queue_index] = BranchQueue::new(); + } + + return result; + } +} + +impl Index for FakeTree { + type Output = FakeBranch; + + fn index(&self, index: BranchId) -> &Self::Output { + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for FakeTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + return &mut self.branches[index.index as usize]; + } +} + +// ----------------------------------------------------------------------------- +// Shared logic +// ----------------------------------------------------------------------------- + +fn pop_from_queue(queue: &mut BranchQueue, branches: &mut [B]) -> Option { + if queue.is_empty() { + return None; + } else { + let first_branch = &mut branches[queue.first.index as usize]; + queue.first = first_branch.get_next_id(); + first_branch.set_next_id(BranchId::new_invalid()); + if !queue.first.is_valid() { + queue.last = BranchId::new_invalid(); + } + + return Some(first_branch.get_id()); + } +} + +fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) { + debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid()); + if queue.is_empty() { + queue.first = branch_id; + queue.last = branch_id; + } else { + let last_branch = &mut branches[queue.last as usize]; + last_branch.set_next_id(branch_id); + queue.last = branch_id; + } +} + +fn iter_queue<'a, B: BranchListItem>(queue: &BranchQueue, branches: &'a [B], start_after: Option) -> BranchQueueIter<'a, B> { + let index = match start_after { + Some(branch_id) => { + // Assuming caller is correct and that the branch is in the queue + let first_branch = &branches[branch_id.index as usize]; + first_branch.get_next_id().index as usize + }, + None => { + queue.first.index as usize + } + }; + + return BranchQueueIter{ branches, index }; } \ No newline at end of file