Changeset - f3664eb428e9
[Not reviewed]
0 2 1
MH - 4 years ago 2021-11-05 13:20:03
contact@maxhenger.nl
rework storage of branches
3 files changed with 286 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
new file 100644
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
}
 

	
 
impl BranchId {
 
    #[inline]
 
    fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
 
    #[inline]
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        return Self{ index };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_in_queue: BranchId::new_invalid(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
                (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_in_queue: BranchId::new_invalid(),
 
        }
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
#[derive(Copy, Clone)]
 
struct BranchQueue {
 
    first: BranchId,
 
    last: BranchId,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{
 
            first: BranchId::new_invalid(),
 
            last: BranchId::new_invalid()
 
        }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!(self.first.is_valid() == self.last.is_valid());
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
impl QueueKind {
 
    fn as_index(&self) -> usize {
 
        return match self {
 
            QueueKind::Runnable => 0,
 
            QueueKind::AwaitingMessage => 1,
 
            QueueKind::FinishedSync => 2,
 
        }
 
    }
 
}
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
/// contains branches that ended up in a particular execution state. The lists
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    pub queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: ComponentState) -> Self {
 
        return Self {
 
            branches: vec![Branch::new_non_sync(component)],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    // --- Generic branch (queue) management
 

	
 
    /// Returns if tree is in speculative mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return self.branches.len() != 1;
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            return None;
 
        } else {
 
            let first_branch = &mut self.branches[queue.first.index as usize];
 
            queue.first = first_branch.next_in_queue;
 
            first_branch.next_in_queue = BranchId::new_invalid();
 
            if !queue.first.is_valid() {
 
                queue.last = BranchId::new_invalid();
 
            }
 

	
 
            return Some(first_branch.id);
 
        }
 
    }
 

	
 
    /// Pushes a branch (ID) into a queue.
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            queue.first = id;
 
            queue.last = id;
 
        } else {
 
            let last_branch = &mut self.branches[queue.last.index as usize];
 
            last_branch.next_in_queue = id;
 
            queue.last = id;
 
        }
 
    }
 

	
 
    pub fn iter_queue(&self, kind: QueueKind) -> BranchIter {
 
        let queue = &self.queues[kind.as_index()];
 
        let index = queue.first as usize;
 
        return BranchIter{
 
            branches: self.branches.as_slice(),
 
            index,
 
        }
 
    }
 

	
 
    // --- Preparing and finishing a speculative round
 

	
 
    /// Starts a synchronous round by cloning the non-sync branch and marking it
 
    /// as the root of the speculative tree.
 
    pub fn start_sync(&mut self) {
 
        debug_assert!(!self.is_in_sync());
 
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 
        self.push_into_queue(QueueKind::Runnable, sync_branch_id);
 
    }
 

	
 
    /// Creates a new speculative branch based on the provided one. The index to
 
    /// retrieve this new branch will be returned.
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId, initial_queue: Option<QueueKind>) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = Branch::new_sync(1, parent_branch);
 
        let new_branch_id = new_branch.id;
 

	
 
        if let Some(kind) = initial_queue {
 
            self.push_into_queue(kind, new_branch_id);
 
        }
 

	
 
        return new_branch_id;
 
    }
 

	
 
    /// Collapses the speculative execution tree back into a deterministic one,
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id));
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1);
 

	
 
        // Reset all values to non-sync defaults
 
        let branch = &mut self.branches[0];
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        branch.next_in_queue = BranchId::new_invalid();
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
 
            self.queues[queue_idx] = BranchQueue::new();
 
        }
 
    }
 
}
 

	
 
impl Index<BranchId> for ExecTree {
 
    type Output = Branch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
pub struct BranchIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchIter<'a> {
 
    type Item = &'a Branch;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            // i.e. the invalid branch index
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.next_in_queue.index as usize;
 
        return Some(branch);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
@@ -3,6 +3,7 @@
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod branch;
 
mod native;
 
mod port;
 
mod scheduler;
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -5,7 +5,7 @@ use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 4;     // number of threads in runtime
 
const NUM_THREADS: u32 = 1;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 10;      // number of loops within a single test (not used by all tests)
 

	
0 comments (0 inline, 0 general)