Files @ ecc47971d535
Branch filter:

Location: CSY/reowolf/src/runtime2/branch.rs

ecc47971d535 11.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on handling sync solution messages
use std::collections::HashMap;
use std::ops::{Index, IndexMut};

use crate::protocol::ComponentState;
use crate::protocol::eval::{Value, ValueGroup};
use crate::runtime2::port::{Port, PortIdLocal};

/// Generic branch ID. A component will always have one branch: the
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
/// we use this fact to let branch ID 0 denote the ID being invalid.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BranchId {
    pub index: u32
}

impl BranchId {
    #[inline]
    pub(crate) fn new_invalid() -> Self {
        return Self{ index: 0 };
    }

    #[inline]
    fn new(index: u32) -> Self {
        debug_assert!(index != 0);
        return Self{ index };
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.index != 0;
    }
}

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum SpeculativeState {
    // Non-synchronous variants
    RunningNonSync,         // regular execution of code
    Error,                  // encountered a runtime error
    Finished,               // finished executing connector's code
    // Synchronous variants
    RunningInSync,          // running within a sync block
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
    Inconsistent,           // branch can never represent a local solution, so halted
}

/// The execution state of a branch. This envelops the PDL code and the
/// execution state. And derived from that: if we're ready to keep running the
/// code, or if we're halted for some reason (e.g. waiting for a message).
pub(crate) struct Branch {
    pub id: BranchId,
    pub parent_id: BranchId,
    // Execution state
    pub code_state: ComponentState,
    pub sync_state: SpeculativeState,
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
    pub inbox: HashMap<PortIdLocal, ValueGroup>, // TODO: Remove, currently only valid in single-get/put mode
    pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove?
}

impl Branch {
    /// Creates a new non-speculative branch
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
        Branch {
            id: BranchId::new_invalid(),
            parent_id: BranchId::new_invalid(),
            code_state: component_state,
            sync_state: SpeculativeState::RunningNonSync,
            awaiting_port: PortIdLocal::new_invalid(),
            next_in_queue: BranchId::new_invalid(),
            inbox: HashMap::new(),
            prepared_channel: None,
        }
    }

    /// Constructs a sync branch. The provided branch is assumed to be the
    /// parent of the new branch within the execution tree.
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
        debug_assert!(
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
                (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
        );
        debug_assert!(parent_branch.prepared_channel.is_none());

        Branch {
            id: BranchId::new(new_index),
            parent_id: parent_branch.index,
            code_state: parent_branch.code_state.clone(),
            sync_state: SpeculativeState::RunningInSync,
            awaiting_port: parent_branch.awaiting_port,
            next_in_queue: BranchId::new_invalid(),
            inbox: parent_branch.inbox.clone(),
            prepared_channel: None,
        }
    }

    /// Inserts a message into the branch for retrieval by a corresponding
    /// `get(port)` call.
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
        debug_assert!(target_port.is_valid());
        debug_assert!(self.awaiting_port == target_port);
        self.awaiting_port = PortIdLocal::new_invalid();
        self.inbox.insert(target_port, contents);
    }
}

/// Queue of branches. Just a little helper.
#[derive(Copy, Clone)]
struct BranchQueue {
    first: BranchId,
    last: BranchId,
}

impl BranchQueue {
    #[inline]
    fn new() -> Self {
        Self{
            first: BranchId::new_invalid(),
            last: BranchId::new_invalid()
        }
    }

    #[inline]
    fn is_empty(&self) -> bool {
        debug_assert!(self.first.is_valid() == self.last.is_valid());
        return !self.first.is_valid();
    }
}

const NUM_QUEUES: usize = 3;

#[derive(PartialEq, Eq)]
pub(crate) enum QueueKind {
    Runnable,
    AwaitingMessage,
    FinishedSync,
}

impl QueueKind {
    fn as_index(&self) -> usize {
        return match self {
            QueueKind::Runnable => 0,
            QueueKind::AwaitingMessage => 1,
            QueueKind::FinishedSync => 2,
        }
    }
}

/// Execution tree of branches. Tries to keep the extra information stored
/// herein to a minimum. So the execution tree is aware of the branches, their
/// execution state and the way they're dependent on each other, but the
/// execution tree should not be aware of e.g. sync algorithms.
///
/// Note that the tree keeps track of multiple lists of branches. Each list
/// contains branches that ended up in a particular execution state. The lists
/// are described by the various `BranchQueue` instances and the `next_in_queue`
/// field in each branch.
pub(crate) struct ExecTree {
    // All branches. the `parent_id` field in each branch implies the shape of
    // the tree. Branches are index stable throughout a sync round.
    pub branches: Vec<Branch>,
    pub queues: [BranchQueue; NUM_QUEUES]
}

impl ExecTree {
    /// Constructs a new execution tree with a single non-sync branch.
    pub fn new(component: ComponentState) -> Self {
        return Self {
            branches: vec![Branch::new_non_sync(component)],
            queues: [BranchQueue::new(); 3]
        }
    }

    // --- Generic branch (queue) management

    /// Returns if tree is in speculative mode
    pub fn is_in_sync(&self) -> bool {
        return self.branches.len() != 1;
    }

    /// Returns true if the particular queue is empty
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
        return self.queues[kind.as_index()].is_empty();
    }

    /// Pops a branch (ID) from a queue.
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
        let queue = &mut self.queues[kind.as_index()];
        if queue.is_empty() {
            return None;
        } else {
            let first_branch = &mut self.branches[queue.first.index as usize];
            queue.first = first_branch.next_in_queue;
            first_branch.next_in_queue = BranchId::new_invalid();
            if !queue.first.is_valid() {
                queue.last = BranchId::new_invalid();
            }

            return Some(first_branch.id);
        }
    }

    /// Pushes a branch (ID) into a queue.
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
        let queue = &mut self.queues[kind.as_index()];
        if queue.is_empty() {
            queue.first = id;
            queue.last = id;
        } else {
            let last_branch = &mut self.branches[queue.last.index as usize];
            last_branch.next_in_queue = id;
            queue.last = id;
        }
    }

    /// Returns the non-sync branch (TODO: better name?)
    pub fn base_branch_mut(&mut self) -> &mut Branch {
        debug_assert!(!self.is_in_sync());
        return &mut self.branches[0];
    }

    /// Returns an iterator over all the elements in the queue of the given
    /// kind. One can start the iteration at the branch *after* the provided
    /// branch. Just make sure it actually is in the provided queue.
    pub fn iter_queue(&self, kind: QueueKind, start_at: Option<BranchId>) -> BranchQueueIter {
        let queue = &self.queues[kind.as_index()];

        let index = match start_at {
            Some(branch_id) => {
                debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id));
                let branch = &self.branches[branch_id.index as usize];

                branch.next_in_queue.index as usize
            },
            None => {
                queue.first as usize
            }
        };

        return BranchQueueIter {
            branches: self.branches.as_slice(),
            index,
        }
    }

    /// Returns an iterator that starts with the provided branch, and then
    /// continues to visit all of the branch's parents.
    pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter {
        return BranchParentIter{
            branches: self.branches.as_slice(),
            index: branch_id.index as usize,
        }
    }

    // --- Preparing and finishing a speculative round

    /// Starts a synchronous round by cloning the non-sync branch and marking it
    /// as the root of the speculative tree. The id of this root sync branch is
    /// returned.
    pub fn start_sync(&mut self) -> BranchId {
        debug_assert!(!self.is_in_sync());
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
        let sync_branch_id = sync_branch.id;
        self.branches.push(sync_branch);

        return sync_branch_id;
    }

    /// Creates a new speculative branch based on the provided one. The index to
    /// retrieve this new branch will be returned.
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
        debug_assert!(self.is_in_sync());
        let parent_branch = &self[parent_branch_id];
        let new_branch = Branch::new_sync(1, parent_branch);
        let new_branch_id = new_branch.id;

        return new_branch_id;
    }

    /// Collapses the speculative execution tree back into a deterministic one,
    /// using the provided branch as the final sync result.
    pub fn end_sync(&mut self, branch_id: BranchId) {
        debug_assert!(self.is_in_sync());
        debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id));

        // Swap indicated branch into the first position
        self.branches.swap(0, branch_id.index as usize);
        self.branches.truncate(1);

        // Reset all values to non-sync defaults
        let branch = &mut self.branches[0];
        branch.id = BranchId::new_invalid();
        branch.parent_id = BranchId::new_invalid();
        branch.sync_state = SpeculativeState::RunningNonSync;
        branch.next_in_queue = BranchId::new_invalid();

        // Clear out all the queues
        for queue_idx in 0..NUM_QUEUES {
            self.queues[queue_idx] = BranchQueue::new();
        }
    }
}

impl Index<BranchId> for ExecTree {
    type Output = Branch;

    fn index(&self, index: BranchId) -> &Self::Output {
        debug_assert!(index.is_valid());
        return &self.branches[index.index as usize];
    }
}

impl IndexMut<BranchId> for ExecTree {
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
        debug_assert!(index.is_valid());
        return &mut self.branches[index.index as usize];
    }
}

pub struct BranchQueueIter<'a> {
    branches: &'a [Branch],
    index: usize,
}

impl<'a> Iterator for BranchQueueIter<'a> {
    type Item = &'a Branch;

    fn next(&mut self) -> Option<Self::Item> {
        if self.index == 0 {
            // i.e. the invalid branch index
            return None;
        }

        let branch = &self.branches[self.index];
        self.index = branch.next_in_queue.index as usize;
        return Some(branch);
    }
}

pub struct BranchParentIter<'a> {
    branches: &'a [Branch],
    index: usize,
}

impl<'a> Iterator for BranchParentIter<'a> {
    type Item = &'a Branch;

    fn next(&mut self) -> Option<Self::Item> {
        if self.index == 0 {
            return None;
        }

        let branch = &self.branches[self.index];
        self.index = branch.parent_id.index as usize;
        return Some(branch);
    }
}