Changeset - f3664eb428e9
[Not reviewed]
0 2 1
MH - 4 years ago 2021-11-05 13:20:03
contact@maxhenger.nl
rework storage of branches
3 files changed with 286 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
new file 100644
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
}
 

	
 
impl BranchId {
 
    #[inline]
 
    fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
 
    #[inline]
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        return Self{ index };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: ComponentState,
 
    pub sync_state: SpeculativeState,
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: ComponentState) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_in_queue: BranchId::new_invalid(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
                (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_in_queue: BranchId::new_invalid(),
 
        }
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
#[derive(Copy, Clone)]
 
struct BranchQueue {
 
    first: BranchId,
 
    last: BranchId,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{
 
            first: BranchId::new_invalid(),
 
            last: BranchId::new_invalid()
 
        }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!(self.first.is_valid() == self.last.is_valid());
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
impl QueueKind {
 
    fn as_index(&self) -> usize {
 
        return match self {
 
            QueueKind::Runnable => 0,
 
            QueueKind::AwaitingMessage => 1,
 
            QueueKind::FinishedSync => 2,
 
        }
 
    }
 
}
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
/// contains branches that ended up in a particular execution state. The lists
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    pub queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: ComponentState) -> Self {
 
        return Self {
 
            branches: vec![Branch::new_non_sync(component)],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    // --- Generic branch (queue) management
 

	
 
    /// Returns if tree is in speculative mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return self.branches.len() != 1;
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            return None;
 
        } else {
 
            let first_branch = &mut self.branches[queue.first.index as usize];
 
            queue.first = first_branch.next_in_queue;
 
            first_branch.next_in_queue = BranchId::new_invalid();
 
            if !queue.first.is_valid() {
 
                queue.last = BranchId::new_invalid();
 
            }
 

	
 
            return Some(first_branch.id);
 
        }
 
    }
 

	
 
    /// Pushes a branch (ID) into a queue.
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            queue.first = id;
 
            queue.last = id;
 
        } else {
 
            let last_branch = &mut self.branches[queue.last.index as usize];
 
            last_branch.next_in_queue = id;
 
            queue.last = id;
 
        }
 
    }
 

	
 
    pub fn iter_queue(&self, kind: QueueKind) -> BranchIter {
 
        let queue = &self.queues[kind.as_index()];
 
        let index = queue.first as usize;
 
        return BranchIter{
 
            branches: self.branches.as_slice(),
 
            index,
 
        }
 
    }
 

	
 
    // --- Preparing and finishing a speculative round
 

	
 
    /// Starts a synchronous round by cloning the non-sync branch and marking it
 
    /// as the root of the speculative tree.
 
    pub fn start_sync(&mut self) {
 
        debug_assert!(!self.is_in_sync());
 
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 
        self.push_into_queue(QueueKind::Runnable, sync_branch_id);
 
    }
 

	
 
    /// Creates a new speculative branch based on the provided one. The index to
 
    /// retrieve this new branch will be returned.
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId, initial_queue: Option<QueueKind>) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = Branch::new_sync(1, parent_branch);
 
        let new_branch_id = new_branch.id;
 

	
 
        if let Some(kind) = initial_queue {
 
            self.push_into_queue(kind, new_branch_id);
 
        }
 

	
 
        return new_branch_id;
 
    }
 

	
 
    /// Collapses the speculative execution tree back into a deterministic one,
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id));
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1);
 

	
 
        // Reset all values to non-sync defaults
 
        let branch = &mut self.branches[0];
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        branch.next_in_queue = BranchId::new_invalid();
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
 
            self.queues[queue_idx] = BranchQueue::new();
 
        }
 
    }
 
}
 

	
 
impl Index<BranchId> for ExecTree {
 
    type Output = Branch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
pub struct BranchIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchIter<'a> {
 
    type Item = &'a Branch;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            // i.e. the invalid branch index
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.next_in_queue.index as usize;
 
        return Some(branch);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod branch;
 
mod native;
 
mod port;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 

	
 
// Imports
 

	
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Condvar, Mutex, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub(crate) enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub ctx_fancy: ComponentCtxFancy,
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            protocol_description,
 
            port_counter: AtomicU32::new(0),
 
            connectors: RwLock::new(ConnectorStore::with_capacity(32)),
 
            connector_queue: Mutex::new(VecDeque::with_capacity(32)),
 
            schedulers: Mutex::new(Vec::new()),
 
            scheduler_notifier: Condvar::new(),
 
            active_connectors: AtomicU32::new(0),
 
            active_interfaces: AtomicU32::new(1), // this `Runtime` instance
 
            should_exit: AtomicBool::new(false),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads as usize);
 
            for thread_index in 0..num_threads {
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::Builder::new()
 
                    .name(format!("thread-{}", thread_index))
 
                    .spawn(move || {
 
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
 
                        scheduler.run();
 
                    })
 
                    .unwrap();
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        self.inner.increment_active_interfaces();
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector_key = self.inner.create_interface_component(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_interfaces();
 
        let mut lock = self.inner.schedulers.lock().unwrap();
 
        for handle in lock.drain(..) {
 
            handle.join().unwrap();
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// RuntimeInner
 
// -----------------------------------------------------------------------------
 

	
 
pub(crate) struct RuntimeInner {
 
    // Protocol
 
    pub(crate) protocol_description: ProtocolDescription,
 
    // Regular counter for port IDs
 
    port_counter: AtomicU32,
 
    // Storage of connectors and the work queue
 
    connectors: RwLock<ConnectorStore>,
 
    connector_queue: Mutex<VecDeque<ConnectorKey>>,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>,
 
    // Conditions to determine whether the runtime can exit
 
    scheduler_notifier: Condvar,  // coupled to mutex on `connector_queue`.
 
    // TODO: Figure out if we can simply merge the counters?
 
    active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active)
 
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
 
    should_exit: AtomicBool,
 
}
 

	
 
impl RuntimeInner {
 
    // --- Managing the components queued for execution
 

	
 
    /// Wait until there is a connector to run. If there is one, then `Some`
 
    /// will be returned. If there is no more work, then `None` will be
 
    /// returned.
 
    pub(crate) fn wait_for_work(&self) -> Option<ConnectorKey> {
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 4;     // number of threads in runtime
 
const NUM_THREADS: u32 = 1;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 10;      // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"putting!\");
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                print(\"getting!\");
 
                auto result = get(receiver);
 
                assert(result);
 

	
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)