From f3664eb428e974e5a9aa0d2540050a6bba7cd44d 2021-11-05 13:20:03 From: MH Date: 2021-11-05 13:20:03 Subject: [PATCH] rework storage of branches --- diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs new file mode 100644 index 0000000000000000000000000000000000000000..f67f2e5570a5743c3680ae3af354a0bd59ce13d7 --- /dev/null +++ b/src/runtime2/branch.rs @@ -0,0 +1,284 @@ +use std::ops::{Index, IndexMut}; + +use crate::protocol::ComponentState; + +/// Generic branch ID. A component will always have one branch: the +/// non-speculative branch. This branch has ID 0. Hence in a speculative context +/// we use this fact to let branch ID 0 denote the ID being invalid. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BranchId { + pub index: u32 +} + +impl BranchId { + #[inline] + fn new_invalid() -> Self { + return Self{ index: 0 }; + } + + #[inline] + fn new(index: u32) -> Self { + debug_assert!(index != 0); + return Self{ index }; + } + + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.index != 0; + } +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum SpeculativeState { + // Non-synchronous variants + RunningNonSync, // regular execution of code + Error, // encountered a runtime error + Finished, // finished executing connector's code + // Synchronous variants + RunningInSync, // running within a sync block + HaltedAtBranchPoint, // at a branching point (at a `get` call) + ReachedSyncEnd, // reached end of sync block, branch represents a local solution + Inconsistent, // branch can never represent a local solution, so halted +} + +/// The execution state of a branch. This envelops the PDL code and the +/// execution state. And derived from that: if we're ready to keep running the +/// code, or if we're halted for some reason (e.g. waiting for a message). +pub(crate) struct Branch { + pub id: BranchId, + pub parent_id: BranchId, + // Execution state + pub code_state: ComponentState, + pub sync_state: SpeculativeState, + pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` +} + +impl Branch { + /// Creates a new non-speculative branch + pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { + Branch { + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + code_state: component_state, + sync_state: SpeculativeState::RunningNonSync, + next_in_queue: BranchId::new_invalid(), + } + } + + /// Constructs a sync branch. The provided branch is assumed to be the + /// parent of the new branch within the execution tree. + fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { + debug_assert!( + (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || + (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) + ); + debug_assert!(parent_branch.prepared_channel.is_none()); + + Branch { + id: BranchId::new(new_index), + parent_id: parent_branch.index, + code_state: parent_branch.code_state.clone(), + sync_state: SpeculativeState::RunningInSync, + next_in_queue: BranchId::new_invalid(), + } + } +} + +/// Queue of branches. Just a little helper. +#[derive(Copy, Clone)] +struct BranchQueue { + first: BranchId, + last: BranchId, +} + +impl BranchQueue { + #[inline] + fn new() -> Self { + Self{ + first: BranchId::new_invalid(), + last: BranchId::new_invalid() + } + } + + #[inline] + fn is_empty(&self) -> bool { + debug_assert!(self.first.is_valid() == self.last.is_valid()); + return !self.first.is_valid(); + } +} + +const NUM_QUEUES: usize = 3; + +pub(crate) enum QueueKind { + Runnable, + AwaitingMessage, + FinishedSync, +} + +impl QueueKind { + fn as_index(&self) -> usize { + return match self { + QueueKind::Runnable => 0, + QueueKind::AwaitingMessage => 1, + QueueKind::FinishedSync => 2, + } + } +} + +/// Execution tree of branches. Tries to keep the extra information stored +/// herein to a minimum. So the execution tree is aware of the branches, their +/// execution state and the way they're dependent on each other, but the +/// execution tree should not be aware of e.g. sync algorithms. +/// +/// Note that the tree keeps track of multiple lists of branches. Each list +/// contains branches that ended up in a particular execution state. The lists +/// are described by the various `BranchQueue` instances and the `next_in_queue` +/// field in each branch. +pub(crate) struct ExecTree { + // All branches. the `parent_id` field in each branch implies the shape of + // the tree. Branches are index stable throughout a sync round. + pub branches: Vec, + pub queues: [BranchQueue; NUM_QUEUES] +} + +impl ExecTree { + /// Constructs a new execution tree with a single non-sync branch. + pub fn new(component: ComponentState) -> Self { + return Self { + branches: vec![Branch::new_non_sync(component)], + queues: [BranchQueue::new(); 3] + } + } + + // --- Generic branch (queue) management + + /// Returns if tree is in speculative mode + pub fn is_in_sync(&self) -> bool { + return self.branches.len() != 1; + } + + /// Pops a branch (ID) from a queue. + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + let queue = &mut self.queues[kind.as_index()]; + if queue.is_empty() { + return None; + } else { + let first_branch = &mut self.branches[queue.first.index as usize]; + queue.first = first_branch.next_in_queue; + first_branch.next_in_queue = BranchId::new_invalid(); + if !queue.first.is_valid() { + queue.last = BranchId::new_invalid(); + } + + return Some(first_branch.id); + } + } + + /// Pushes a branch (ID) into a queue. + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + let queue = &mut self.queues[kind.as_index()]; + if queue.is_empty() { + queue.first = id; + queue.last = id; + } else { + let last_branch = &mut self.branches[queue.last.index as usize]; + last_branch.next_in_queue = id; + queue.last = id; + } + } + + pub fn iter_queue(&self, kind: QueueKind) -> BranchIter { + let queue = &self.queues[kind.as_index()]; + let index = queue.first as usize; + return BranchIter{ + branches: self.branches.as_slice(), + index, + } + } + + // --- Preparing and finishing a speculative round + + /// Starts a synchronous round by cloning the non-sync branch and marking it + /// as the root of the speculative tree. + pub fn start_sync(&mut self) { + debug_assert!(!self.is_in_sync()); + let sync_branch = Branch::new_sync(1, &self.branches[0]); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + self.push_into_queue(QueueKind::Runnable, sync_branch_id); + } + + /// Creates a new speculative branch based on the provided one. The index to + /// retrieve this new branch will be returned. + pub fn fork_branch(&mut self, parent_branch_id: BranchId, initial_queue: Option) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = Branch::new_sync(1, parent_branch); + let new_branch_id = new_branch.id; + + if let Some(kind) = initial_queue { + self.push_into_queue(kind, new_branch_id); + } + + return new_branch_id; + } + + /// Collapses the speculative execution tree back into a deterministic one, + /// using the provided branch as the final sync result. + pub fn end_sync(&mut self, branch_id: BranchId) { + debug_assert!(self.is_in_sync()); + debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id)); + + // Swap indicated branch into the first position + self.branches.swap(0, branch_id.index as usize); + self.branches.truncate(1); + + // Reset all values to non-sync defaults + let branch = &mut self.branches[0]; + branch.id = BranchId::new_invalid(); + branch.parent_id = BranchId::new_invalid(); + branch.sync_state = SpeculativeState::RunningNonSync; + branch.next_in_queue = BranchId::new_invalid(); + + // Clear out all the queues + for queue_idx in 0..NUM_QUEUES { + self.queues[queue_idx] = BranchQueue::new(); + } + } +} + +impl Index for ExecTree { + type Output = Branch; + + fn index(&self, index: BranchId) -> &Self::Output { + debug_assert!(index.is_valid()); + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for ExecTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + debug_assert!(index.is_valid()); + return &mut self.branches[index.index as usize]; + } +} + +pub struct BranchIter<'a> { + branches: &'a [Branch], + index: usize, +} + +impl<'a> Iterator for BranchIter<'a> { + type Item = &'a Branch; + + fn next(&mut self) -> Option { + if self.index == 0 { + // i.e. the invalid branch index + return None; + } + + let branch = &self.branches[self.index]; + self.index = branch.next_in_queue.index as usize; + return Some(branch); + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 9b741ce49417d1f7933c8622bcac4f8c4c7ffe4a..1e64664226ec45f096d486297005f7df11d9f0eb 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -3,6 +3,7 @@ mod runtime; mod messages; mod connector; +mod branch; mod native; mod port; mod scheduler; diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 5bd0ecb8d3f47dc32127f1ffacd0c8c213d02949..7b2900640ff08e539807ed562ad6de738598aa01 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -5,7 +5,7 @@ use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; -const NUM_THREADS: u32 = 4; // number of threads in runtime +const NUM_THREADS: u32 = 1; // number of threads in runtime const NUM_INSTANCES: u32 = 10; // number of test instances constructed const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests)