Changeset - d2cb5e59f7e9
[Not reviewed]
0 8 0
MH - 4 years ago 2021-12-03 15:45:31
contact@maxhenger.nl
Cleaning up some warnings
8 files changed with 21 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/runtime/branch.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 

	
 
use super::port::PortIdLocal;
 

	
 
// To share some logic between the FakeTree and ExecTree implementation
 
trait BranchListItem {
 
    fn get_id(&self) -> BranchId;
 
    fn set_next_id(&mut self, id: BranchId);
 
    fn get_next_id(&self) -> BranchId;
 
}
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
    pub index: u32
 
}
 

	
 
impl BranchId {
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
 
    #[inline]
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        return Self{ index };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum PreparedStatement {
 
    CreatedChannel((Value, Value)),
 
    ForkedExecution(bool),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl PreparedStatement {
 
    pub(crate) fn is_none(&self) -> bool {
 
        if let PreparedStatement::None = self {
 
            return true;
 
        } else {
 
            return false;
 
        }
 
    }
 

	
 
    pub(crate) fn take(&mut self) -> PreparedStatement {
 
        if let PreparedStatement::None = self {
 
            return PreparedStatement::None;
 
        } else {
 
            let mut replacement = PreparedStatement::None;
 
            std::mem::swap(self, &mut replacement);
 
            return replacement;
 
        }
 
    }
 
}
 

	
 
/// The execution state of a branch. This envelops the PDL code and the
 
/// execution state. And derived from that: if we're ready to keep running the
 
/// code, or if we're halted for some reason (e.g. waiting for a message).
 
pub(crate) struct Branch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    // Execution state
 
    pub code_state: Prompt,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue.
 
    pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue`
 
    pub prepared: PreparedStatement,
 
}
 

	
 
impl BranchListItem for Branch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl Branch {
 
    /// Creates a new non-speculative branch
 
    pub(crate) fn new_non_sync(component_state: Prompt) -> Self {
 
        Branch {
 
            id: BranchId::new_invalid(),
 
            parent_id: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        // debug_assert!(
 
        //     (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
        //     (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        // ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            prepared: PreparedStatement::None,
 
        }
 
    }
 
}
 

	
 
/// Queue of branches. Just a little helper.
 
#[derive(Copy, Clone)]
 
struct BranchQueue {
 
    first: BranchId,
 
    last: BranchId,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{
 
            first: BranchId::new_invalid(),
 
            last: BranchId::new_invalid()
 
        }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!(self.first.is_valid() == self.last.is_valid());
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
impl QueueKind {
 
    fn as_index(&self) -> usize {
 
        return match self {
 
            QueueKind::Runnable => 0,
 
            QueueKind::AwaitingMessage => 1,
 
            QueueKind::FinishedSync => 2,
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ExecTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Execution tree of branches. Tries to keep the extra information stored
 
/// herein to a minimum. So the execution tree is aware of the branches, their
 
/// execution state and the way they're dependent on each other, but the
 
/// execution tree should not be aware of e.g. sync algorithms.
 
///
 
/// Note that the tree keeps track of multiple lists of branches. Each list
 
/// contains branches that ended up in a particular execution state. The lists
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: Prompt) -> Self {
 
        return Self {
 
            branches: vec![Branch::new_non_sync(component)],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    // --- Generic branch (queue) management
 

	
 
    /// Returns if tree is in speculative mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return self.branches.len() != 1;
 
    }
 

	
 
    /// Returns true if the particular queue is empty
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    /// Pushes a branch (ID) into a queue.
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    /// Returns the non-sync branch (TODO: better name?)
 
    pub fn base_branch_mut(&mut self) -> &mut Branch {
 
        debug_assert!(!self.is_in_sync());
 
        return &mut self.branches[0];
 
    }
 

	
 
    /// Returns the branch ID of the first branch in a particular queue.
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    /// Returns the next branch ID of a branch (assumed to be in a particular
 
    /// queue.
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    /// Returns an iterator that starts with the provided branch, and then
 
    /// continues to visit all of the branch's parents.
 
    pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter {
 
        return BranchParentIter{
 
            branches: self.branches.as_slice(),
 
            index: branch_id.index as usize,
 
        }
 
    }
 

	
 
    // --- Preparing and finishing a speculative round
 

	
 
    /// Starts a synchronous round by cloning the non-sync branch and marking it
 
    /// as the root of the speculative tree. The id of this root sync branch is
 
    /// returned.
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 
        let sync_branch = Branch::new_sync(1, &self.branches[0]);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    /// Creates a new speculative branch based on the provided one. The index to
 
    /// retrieve this new branch will be returned.
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    /// Collapses the speculative execution tree back into a deterministic one,
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1);
 

	
 
        // Reset all values to non-sync defaults
 
        let branch = &mut self.branches[0];
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        debug_assert!(!branch.awaiting_port.is_valid());
 
        branch.next_in_queue = BranchId::new_invalid();
 
        debug_assert!(branch.prepared.is_none());
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
 
            self.queues[queue_idx] = BranchQueue::new();
 
        }
 
    }
 
}
 

	
 
impl Index<BranchId> for ExecTree {
 
    type Output = Branch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
/// Iterator over the parents of an `ExecTree` branch.
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// FakeTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Generic fake branch. This is supposed to be used in conjunction with the
 
/// fake tree. The purpose is to have a branching-like tree to use in
 
/// combination with a consensus algorithm in places where we don't have PDL
 
/// code.
 
pub(crate) struct FakeBranch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal,
 
    pub next_in_queue: BranchId,
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>,
 
}
 

	
 
impl BranchListItem for FakeBranch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl FakeBranch {
 
    fn new_root(_index: u32) -> FakeBranch {
 
        debug_assert!(_index == 1);
 
        return FakeBranch{
 
            id: BranchId::new(1),
 
            parent_id: BranchId::new_invalid(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
        }
 
    }
 

	
 
    fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch {
 
        return FakeBranch {
 
            id: BranchId::new(index),
 
            parent_id: parent_branch.id,
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// A little helper for native components that don't have a set of branches that
 
/// are actually executing code, but just have to manage the idea of branches
 
/// due to them performing the equivalent of a branching `get` call.
 
pub(crate) struct FakeTree {
 
    pub branches: Vec<FakeBranch>,
 
    queues: [BranchQueue; NUM_QUEUES],
 
}
 

	
 
impl FakeTree {
 
    pub fn new() -> Self {
 
        // TODO: Don't like this? Cause is that now we don't have a non-sync
 
        //  branch. But we assumed BranchId=0 means the branch is invalid. We
 
        //  can do the rusty Option<BranchId> stuff. But we still need a token
 
        //  value within the protocol to signify no-branch-id. Maybe the high
 
        //  bit? Branches are crazy expensive, no-one is going to have 2^32
 
        //  branches anyway. 2^31 isn't too bad.
 
        return Self {
 
            branches: vec![FakeBranch{
 
                id: BranchId::new_invalid(),
 
                parent_id: BranchId::new_invalid(),
 
                sync_state: SpeculativeState::RunningNonSync,
 
                awaiting_port: PortIdLocal::new_invalid(),
 
                next_in_queue: BranchId::new_invalid(),
 
                inbox: HashMap::new(),
 
            }],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    fn is_in_sync(&self) -> bool {
 
        return self.branches.len() > 1;
 
    }
 

	
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync);
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first)
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 

	
 
        // Create the first branch
 
        let sync_branch = FakeBranch::new_root(1);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch {
 
        debug_assert!(branch_id.is_valid());
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Take out the succeeding branch, then just clear all fake branches.
 
        self.branches.swap(1, branch_id.index as usize);
 
        self.branches.truncate(2);
 
        let result = self.branches.pop().unwrap();
 

	
 
        for queue_index in 0..NUM_QUEUES {
 
            self.queues[queue_index] = BranchQueue::new();
 
        }
 

	
 
        return result;
 
    }
 
}
 

	
 
impl Index<BranchId> for FakeTree {
 
    type Output = FakeBranch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for FakeTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Shared logic
 
// -----------------------------------------------------------------------------
 

	
 
fn pop_from_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B]) -> Option<BranchId> {
 
    if queue.is_empty() {
 
        return None;
 
    } else {
 
        let first_branch = &mut branches[queue.first.index as usize];
 
        queue.first = first_branch.get_next_id();
 
        first_branch.set_next_id(BranchId::new_invalid());
 
        if !queue.first.is_valid() {
 
            queue.last = BranchId::new_invalid();
 
        }
 

	
 
        return Some(first_branch.get_id());
 
    }
 
}
 

	
 
fn push_into_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) {
 
    debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid());
 
    if queue.is_empty() {
 
        queue.first = branch_id;
 
        queue.last = branch_id;
 
    } else {
 
        let last_branch = &mut branches[queue.last.index as usize];
 
        last_branch.set_next_id(branch_id);
 
        queue.last = branch_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime/connector.rs
Show inline comments
 
// connector.rs
 
//
 
// Represents a component. A component (and the scheduler that is running it)
 
// has many properties that are not easy to subdivide into aspects that are
 
// conceptually handled by particular data structures. That is to say: the code
 
// that we run governs: running PDL code, keeping track of ports, instantiating
 
// new components and transports (i.e. interacting with the runtime), running
 
// a consensus algorithm, etc. But on the other hand, our data is rather
 
// simple: we have a speculative execution tree, a set of ports that we own,
 
// and a bit of code that we should run.
 
//
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::ProtocolDescription;
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, PortId, ValueGroup};
 
use crate::protocol::RunContext;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement};
 
use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
enum Mode {
 
    NonSync,    // running non-sync code
 
    Sync,       // running sync code (in potentially multiple branches)
 
    SyncError,  // encountered an unrecoverable error in sync mode
 
    Error,      // encountered an error in non-sync mode (or finished handling the sync mode error).
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,          // Run again, immediately
 
    Later,              // Schedule for running, at some later point in time
 
    NotNow,             // Do not reschedule for running
 
    Exit,               // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    mode: Mode,
 
    eval_error: Option<EvalError>,
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!("Remove fires() now");
 
        let port_id = PortIdLocal::new(port.id);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    fn fires(&mut self, _port: PortId) -> Option<Value> {
 
        todo!("Remove fires() now")
 
        // let port_id = PortIdLocal::new(port.id);
 
        // let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        // return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
                // Handle any new finished branches
 
                let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
                while let Some(branch_id) = iter_id {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 
                    if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                        return self.enter_non_sync_mode(round_conclusion, comp_ctx);
 
                    }
 

	
 
                    self.last_finished_handled = Some(branch_id);
 
                }
 

	
 
                return scheduling;
 
            },
 
            Mode::NonSync => {
 
                let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::SyncError => {
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::Error => {
 
                // This shouldn't really be called. Because when we reach exit
 
                // mode the scheduler should not run the component anymore
 
                unreachable!("called component run() during error-mode");
 
            },
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: Prompt) -> Self {
 
        Self{
 
            mode: Mode::NonSync,
 
            eval_error: None,
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        while let Some(ticket) = ctx.get_next_message_ticket() {
 
            let message = ctx.read_message_using_ticket(ticket);
 
            let immediate_result = if let Message::Data(_) = message {
 
                self.handle_new_data_message(ticket, ctx);
 
                None
 
            } else {
 
                match ctx.take_message_using_ticket(ticket) {
 
                    Message::Data(_) => unreachable!(),
 
                    Message::SyncComp(message) => {
 
                        self.handle_new_sync_comp_message(message, ctx)
 
                    },
 
                    Message::SyncPort(message) => {
 
                        self.handle_new_sync_port_message(message, ctx);
 
                        None
 
                    },
 
                    Message::SyncControl(message) => {
 
                        self.handle_new_sync_control_message(message, ctx)
 
                    },
 
                    Message::Control(_) => unreachable!("control message in component"),
 
                }
 
            };
 

	
 
            if let Some(result) = immediate_result {
 
                return Some(result);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(ticket, ctx) {
 
            // Message should not be handled now
 
            return;
 
        }
 

	
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        println!("DEBUG: Actually really handling {:?}", message);
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 

	
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            self.eval_error = Some(eval_error);
 
            self.mode = Mode::SyncError;
 
            if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) {
 
                // We can exit immediately
 
                return self.enter_non_sync_mode(conclusion, comp_ctx);
 
            } else {
 
                // Current branch failed. But we may have other things that are
 
                // running.
 
                return ConnectorScheduling::Immediate;
 
            }
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            EvalContinuation::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            EvalContinuation::BlockFires(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.id);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::BlockGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.id);
 

	
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            EvalContinuation::SyncBlockEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            EvalContinuation::NewFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            EvalContinuation::Put(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.id);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                let message = DataMessage{ sync_header, data_header, content };
 
                match comp_ctx.submit_message(Message::Data(message)) {
 
                    Ok(_) => {
 
                        // Message is underway
 
                        branch.prepared = PreparedStatement::PerformedPut;
 
                        self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                        return ConnectorScheduling::Immediate;
 
                    },
 
                    Err(_) => {
 
                        // We don't own the port
 
                        let pd = &sched_ctx.runtime.protocol_description;
 
                        let eval_error = branch.code_state.new_error_at_expr(
 
                            &pd.modules, &pd.heap,
 
                            String::from("attempted to 'put' on port that is no longer owned")
 
                        );
 
                        self.eval_error = Some(eval_error);
 
                        self.mode = Mode::SyncError;
 

	
 
                        println!("DEBUGERINO: Notify of fatal branch");
 
                        if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) {
 
                            println!("DEBUGERINO: Actually got {:?}", conclusion);
 
                            return self.enter_non_sync_mode(conclusion, comp_ctx);
 
                        }
 
                    }
 
                }
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            comp_ctx.push_error(eval_error);
 
            return ConnectorScheduling::Exit
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::Exit;
 
            },
 
            EvalContinuation::SyncBlockStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 
                self.mode = Mode::Sync;
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_prompt = Prompt::new(
 
                    &sched_ctx.runtime.protocol_description.types,
 
                    &sched_ctx.runtime.protocol_description.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                let new_component = ConnectorPDL::new(new_prompt);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            EvalContinuation::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory. If this function returns false, then the component is supposed
 
    /// to exit.
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => None,
 
        };
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
            debug_assert!(fake_vec.is_empty());
 

	
 
            ctx.notify_sync_end(&[]);
 
            self.last_finished_handled = None;
 
            self.eval_error = None; // in case we came from the SyncError mode
 
            self.mode = Mode::NonSync;
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            if let Some(eval_error) = self.eval_error.take() {
 
                ctx.push_error(eval_error);
 
            }
 

	
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
 
    /// condition appears.
 
    #[inline]
 
    fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result<EvalContinuation, EvalError> {
 
        loop {
 
            let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            if let Ok(EvalContinuation::Stepping) = result {
 
                continue;
 
            }
 

	
 
            return result;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal, PortState};
 
use super::inbox::{
 
    Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker,
 
    DataMessage,
 
    SyncCompMessage, SyncCompContent,
 
    SyncPortMessage, SyncPortContent,
 
    SyncControlMessage, SyncControlContent
 
};
 
use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    sync_round_number: u32,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId, u32)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID
 
//  but we do want to enumerate all current ports. So put that somewhere in a
 
//  central place. Secondly. Error handling and regular message handling is
 
//  becoming a mess.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    handled_wave: bool, // encountered notification wave in this round
 
    conclusion: Option<RoundConclusion>,
 
    ack_remaining: u32,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum MessageOrigin {
 
    Past,
 
    Present,
 
    Future
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            handled_wave: false,
 
            conclusion: None,
 
            ack_remaining: 0,
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    #[deprecated]
 
    pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            channel_mapping: ctx.get_ports().iter()
 
                .map(|v| ChannelAnnotation {
 
                    channel_id: v.channel_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            channel_mapping: parent_branch_annotations.channel_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            println!("DEBUG: Failure everything silent");
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // We're not in the trivial case: since we've communicated we need to
 
        // let everyone know that this round is probably not going to end well.
 
        return self.initiate_sync_failure(ctx);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.channel_mapping {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
                        // Inconsistent speculative state and actual state
 
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
 
                        return Consistency::Inconsistent;
 
                    }
 
                },
 
                None => {},
 
            }
 
        }
 

	
 
        return Consistency::Valid;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                let message = SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                };
 
                match ctx.submit_message(Message::SyncPort(message)) {
 
                    Ok(_) => {
 
                        self.encountered_ports.push(self_port_id);
 
                    },
 
                    Err(_) => {
 
                        // Seems like we were done with this branch, but one of
 
                        // the silent ports (in scope) is actually closed
 
                        return self.notify_of_fatal_branch(branch_id, ctx);
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            sync_round_number: self.sync_round,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
    pub fn end_sync(&mut self, branch_id: BranchId, _final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let _branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        println!("DEBUG: ***** Incrementing sync round stuff");
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 

	
 
        println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers)
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
            // self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        let data_header = DataHeader{
 
            expected_mapping: branch.channel_mapping.iter()
 
                .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id)
 
                .copied()
 
                .collect(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool {
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let target_port = message.data_header.target_port;
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return false,
 
            MessageOrigin::Present => {
 
                self.encountered_ports.push(target_port);
 
                return true;
 
            },
 
            MessageOrigin::Future => {
 
                let message = ctx.take_message_using_ticket(ticket);
 
                ctx.put_back_message(message);
 
                return false;
 
            }
 
        }
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return None,
 
            MessageOrigin::Present => {},
 
            MessageOrigin::Future => {
 
                ctx.put_back_message(Message::SyncComp(message));
 
                return None
 
            }
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 

	
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id, _) = solution.component_branches.iter()
 
                    .find(|(component_id, _, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                println!("DEBUGERINO: Got GlobalFailure, sending Ack in response");
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return None,
 
            MessageOrigin::Present => {},
 
            MessageOrigin::Future => {
 
                ctx.put_back_message(Message::SyncPort(message));
 
                return None;
 
            }
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
                return None
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return None;
 
                }
 

	
 
                self.handled_wave = true;
 

	
 
                // Propagate wave to all peers except the one that has sent us
 
                // the wave.
 
                for mapping in &self.branch_annotations[0].channel_mapping {
 
                    let channel_id = mapping.channel_id;
 
                    let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap();
 
                    if port_desc.self_id == message.target_port {
 
                        // Wave came from this port, no need to send one back
 
                        continue;
 
                    }
 

	
 
                    let message = SyncPortMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        source_port: port_desc.self_id,
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    // As with the other SyncPort where we throw away the
 
                    // result: we're dealing with an error here anyway
 
                    let _unused = ctx.submit_message(Message::SyncPort(message));
 
                }
 

	
 
                // And let the leader know about our port state
 
                let annotations = &self.branch_annotations[0];
 
                let mut channels = Vec::with_capacity(annotations.channel_mapping.len());
 
                for mapping in &annotations.channel_mapping {
 
                    let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
                    channels.push(LocalChannelPresence{
 
                        channel_id: mapping.channel_id,
 
                        is_closed: port_info.state == PortState::Closed,
 
                    });
 
                }
 

	
 
                let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
                    component_id: ctx.id,
 
                    channels,
 
                }), ctx);
 
                return maybe_conclusion;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if message.in_response_to_sync_round < self.sync_round {
 
            // Old message
 
            return None
 
        }
 

	
 
        // Because the message is always sent in response to a message
 
        // originating here, the sync round number can never be larger than the
 
        // currently stored one.
 
        debug_assert_eq!(message.in_response_to_sync_round, self.sync_round);
 
        match message.content {
 
            SyncControlContent::ChannelIsClosed(_) => {
 
                return self.initiate_sync_failure(ctx);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap();
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == target_port.channel_id {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(&message.content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                    // self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.channel_mapping {
 
                if expected.channel_id == current.channel_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        let origin = self.handle_peer(&sync_header);
 
        println!(" ********************** GOT {:?}", origin);
 
        if origin != MessageOrigin::Present {
 
            // We do not have to handle it now
 
            return origin;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncCompMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_data_to_new_leader(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        } // else: exactly equal, so do nothing
 

	
 
        return MessageOrigin::Present;
 
    }
 

	
 
    /// Handles a (potentially new) peer. Returns `false` if the provided sync
 
    /// number is different then the expected one.
 
    fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin {
 
        let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id);
 
        match position {
 
            Some(index) => {
 
                let entry = &mut self.peers[index];
 
                if entry.encountered_this_round {
 
                    // Already encountered this round
 
                    if sync_header.sync_round < entry.expected_sync_round {
 
                        return MessageOrigin::Past;
 
                    } else if sync_header.sync_round == entry.expected_sync_round {
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Future;
 
                    }
 
                } else {
 
                    // TODO: Proper handling of potential overflow
 
                    entry.encountered_this_round = true;
 

	
 
                    if sync_header.sync_round >= entry.expected_sync_round {
 
                        entry.expected_sync_round = sync_header.sync_round;
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Past;
 
                    }
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return MessageOrigin::Present;
 
            }
 
        }
 
    }
 

	
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
 
                        match conclusion {
 
                            LeaderConclusion::Solution(global_solution) => {
 
                                return self.handle_global_solution_as_leader(global_solution, ctx);
 
                            },
 
                            LeaderConclusion::Failure => {
 
                                return self.handle_global_failure_as_leader(ctx);
 
                            }
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::AckFailure => {
 
                    debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion);
 
                    debug_assert!(self.ack_remaining > 0);
 
                    self.ack_remaining -= 1;
 
                    if self.ack_remaining == 0 {
 
                        return Some(RoundConclusion::Failure);
 
                    }
 
                }
 
                SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) |
 
                SyncCompContent::GlobalFailure => {
 
                    unreachable!("unexpected message content for leader");
 
                },
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content,
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            // Send solution message
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 

	
 
            // Update peers as leader. Subsequent call to `end_sync` will update
 
            // the round numbers
 
            match self.peers.iter_mut().find(|v| v.id == connector_id) {
 
                Some(peer) => {
 
                    peer.expected_sync_round = sync_round;
 
                },
 
                None => {
 
                    self.peers.push(Peer{
 
                        id: connector_id,
 
                        expected_sync_round: sync_round,
 
                        encountered_this_round: true,
 
                    });
 
                }
 
            }
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            // Already sent out a failure
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.owner_a != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.owner_a) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.owner_a,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            }
 

	
 
            if let Some(owner_b) = presence.owner_b {
 
                if owner_b != ctx.id {
 
                    if encountered.push(owner_b) {
 
                        let message = SyncCompMessage{
 
                            sync_header: self.create_sync_header(ctx),
 
                            target_component_id: owner_b,
 
                            content: SyncCompContent::GlobalFailure,
 
                        };
 
                        ctx.submit_message(Message::SyncComp(message)).unwrap();
 
                    }
 
                }
 
            }
 
        }
 

	
 
        println!("DEBUGERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.index).collect::<Vec<_>>());
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
 
            return Some(RoundConclusion::Failure);
 
        } else {
 
            self.ack_remaining = encountered.len() as u32;
 
            return None;
 
        }
 
    }
 

	
 
    fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Notify leader of our channels and the fact that we just failed
 
        let channel_mapping = &self.branch_annotations[0].channel_mapping;
 
        let mut channel_presence = Vec::with_capacity(channel_mapping.len());
 
        for mapping in channel_mapping {
 
            let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
            channel_presence.push(LocalChannelPresence{
 
                channel_id: mapping.channel_id,
 
                is_closed: port.state == PortState::Closed,
 
            });
 
        }
 
        let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
            component_id: ctx.id,
 
            channels: channel_presence,
 
        }), ctx);
 

	
 
        if self.handled_wave {
 
            // Someone (or us) has already initiated a sync failure.
 
            return maybe_already;
 
        }
 

	
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 
        debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true });
 
        println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion);
 

	
 
        // Initiate a discovery wave so peers can do the same
 
        self.handled_wave = true;
 
        for mapping in &self.branch_annotations[0].channel_mapping {
 
            let channel_id = mapping.channel_id;
 
            let port_info = ctx.get_port_by_channel_id(channel_id).unwrap();
 
            let message = SyncPortMessage{
 
                sync_header: self.create_sync_header(ctx),
 
                source_port: port_info.self_id,
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 

	
 
            // Note: submitting the message might fail. But we're attempting to
 
            // handle the error anyway.
 
            // TODO: Think about this a second time: how do we make sure the
 
            //  entire network will fail if we reach this condition
 
            let _unused = ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return maybe_conclusion;
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
            sync_round: self.sync_round,
 
        }
 
    }
 

	
 
    fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if let Some(partial_solution) = self.solution_combiner.drain() {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncCompContent::PartialSolution(partial_solution),
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug, Clone)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    sync_round: u32,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentPresence {
 
    component_id: ConnectorId,
 
    channels: Vec<LocalChannelPresence>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct LocalChannelPresence {
 
    channel_id: ChannelId,
 
    is_closed: bool,
 
}
 

	
 
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 
enum PresenceState {
 
    OnePresent, // one component reported the channel being open
 
    BothPresent, // two components reported the channel being open
 
    Closed, // one component reported the channel being closed
 
}
 

	
 
/// Record to hold channel state during the error-resolving mode of the leader.
 
/// This is used to determine when the sync region has grown to its largest
 
/// size. The structure is eventually consistent in the sense that a component
 
/// might initially presume a channel is open, only to figure out later it is
 
/// actually closed.
 
#[derive(Debug, Clone)]
 
struct ChannelPresence {
 
    owner_a: ConnectorId,
 
    owner_b: Option<ConnectorId>,
 
    id: ChannelId,
 
    state: PresenceState,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
#[derive(Debug)]
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>, // used for finding solution
 
    presence: Vec<ChannelPresence>, // used to detect all channels present in case of failure
 
    failure_reported: bool,
 
}
 

	
 
struct CheckEntry {
 
    component_index: usize,         // component index in combiner's vector
 
    solution_index: usize,          // solution entry in the above component entry
 
    parent_entry_index: usize,      // parent that caused the creation of this checking entry
 
    match_index_in_parent: usize,   // index in the matches array of the parent
 
    solution_index_in_parent: usize,// index in the solution array of the match entry in the parent
 
}
 

	
 
enum LeaderConclusion {
 
    Solution(GlobalSolution),
 
    Failure,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
            presence: Vec::new(),
 
            failure_reported: false,
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
 
        let component_id = solution.component;
 
        let sync_round = solution.sync_round_number;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            channel_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
            Some((component_index, storage)) => {
 
                // Entry for component exists, so add to solutions
 
                let solution_index = storage.solutions.len();
 
                storage.solutions.push(solution);
 

	
 
                (component_index, solution_index, false)
 
            }
 
            None => {
 
                // Entry for component does not exist yet
 
                let component_index = self.local.len();
 
                self.local.push(ComponentLocalSolutions{
 
                    component: component_id,
 
                    sync_round,
 
                    peers: Vec::new(),
 
                    solutions: vec![solution],
 
                    all_peers_present: false,
 
                });
 
                (component_index, 0, true)
 
            }
 
        };
 

	
 
        // If this is a solution of a component that is new to us, then we check
 
        // in the stored solutions which other components are peers of the new
 
        // one.
 
        if new_component {
 
            let cur_ports = &self.local[component_index].solutions[0].channel_mapping;
 
            let mut component_peers = Vec::new();
 

	
 
            // Find the matching components
 
            for (other_index, other_component) in self.local.iter().enumerate() {
 
                if other_index == component_index {
 
                    // Don't match against ourselves
 
                    continue;
 
                }
 

	
 
                let mut matching_channels = Vec::new();
 
                for (cur_channel_id, _) in cur_ports {
 
                    for (other_channel_id, _) in &other_component.solutions[0].channel_mapping {
 
                        if cur_channel_id == other_channel_id {
 
                            // We have a shared port
 
                            matching_channels.push(*cur_channel_id);
 
                        }
 
                    }
 
                }
 

	
 
                if !matching_channels.is_empty() {
 
                    // We share some ports
 
                    component_peers.push(ComponentPeer{
 
                        target_id: other_component.component,
 
                        target_index: other_index,
 
                        involved_channels: matching_channels,
 
                    });
 
                }
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in &component_peers {
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
            if num_ports_in_peers == cur_ports.len() {
 
                // Newly added component has all required peers present
 
                self.local[component_index].all_peers_present = true;
 
            }
 

	
 
            // Add the found component pairing entries to the solution entries
 
            // for the two involved components
 
            for component_match in component_peers {
 
                // Check the other component for having all peers present
 
                let mut num_ports_in_peers = component_match.involved_channels.len();
 
                let other_component = &mut self.local[component_match.target_index];
 
                for existing_peer in &other_component.peers {
 
                    num_ports_in_peers += existing_peer.involved_channels.len();
 
                }
 

	
 
                if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() {
 
                    other_component.all_peers_present = true;
 
                }
 

	
 
                other_component.peers.push(ComponentPeer{
 
                    target_id: component_id,
 
                    target_index: component_index,
 
                    involved_channels: component_match.involved_channels.clone(),
 
                });
 

	
 
                let new_component = &mut self.local[component_index];
 
                new_component.peers.push(component_match);
 
            }
 
        }
 

	
 
        // We're now sure that we know which other components the currently
 
        // considered component is linked up to. Now we need to check those
 
        // entries (if any) to see if any pair of local solutions match
 
        let mut new_component_matches = Vec::new();
 
        let cur_component = &self.local[component_index];
 
        let cur_solution = &cur_component.solutions[solution_index];
 

	
 
        for peer in &cur_component.peers {
 
            let mut new_solution_matches = Vec::new();
 

	
 
            let other_component = &self.local[peer.target_index];
 
            for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() {
 
                // Check the port mappings between the pair of solutions.
 
                let mut all_matched = true;
 

	
 
                'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping {
 
                    for (other_port, other_branch) in &other_solution.channel_mapping {
 
                        if cur_port == other_port {
 
                            if cur_branch == other_branch {
 
                                // Same port mapping, go to next port
 
                                break;
 
                            } else {
 
                                // Different port mapping, not a match
 
                                all_matched = false;
 
                                break 'mapping_check_loop;
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if !all_matched {
 
                    continue;
 
                }
 

	
 
                // Port mapping between the component pair is the same, so they
 
                // have agreeable local solutions
 
                new_solution_matches.push(other_solution_index);
 
            }
 

	
 
            new_component_matches.push(ComponentMatches{
 
                target_id: peer.target_id,
 
                target_index: peer.target_index,
 
                match_indices: new_solution_matches,
 
            });
 
        }
 

	
 
        // And now that we have the new solution-to-solution matches, we need to
 
        // add those in the appropriate storage.
 
        for new_component_match in new_component_matches {
 
            let other_component = &mut self.local[new_component_match.target_index];
 

	
 
            for other_solution_index in new_component_match.match_indices.iter().copied() {
 
                let other_solution = &mut other_component.solutions[other_solution_index];
 

	
 
                // Add a completely new entry for the component, or add it to
 
                // the existing component entry's matches
 
                match other_solution.matches.iter_mut()
 
                    .find(|v| v.target_id == component_id)
 
                {
 
                    Some(other_match) => {
 
                        other_match.match_indices.push(solution_index);
 
                    },
 
                    None => {
 
                        other_solution.matches.push(ComponentMatches{
 
                            target_id: component_id,
 
                            target_index: component_index,
 
                            match_indices: vec![solution_index],
 
                        })
 
                    }
 
                }
 
            }
 

	
 
            let cur_component = &mut self.local[component_index];
 
            let cur_solution = &mut cur_component.solutions[solution_index];
 

	
 
            match cur_solution.matches.iter_mut()
 
                .find(|v| v.target_id == new_component_match.target_id)
 
            {
 
                Some(other_match) => {
 
                    // Already have an entry
 
                    debug_assert_eq!(other_match.target_index, new_component_match.target_index);
 
                    other_match.match_indices.extend(&new_component_match.match_indices);
 
                },
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_for_global_solution(component_index, solution_index);
 
    }
 

	
 
    fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool {
 
        for entry in channels {
 
            let mut found = false;
 

	
 
            for existing in &mut self.presence {
 
                if existing.id == entry.channel_id {
 
                    // Same entry. We only update if we have the second
 
                    // component coming in it owns one end of the channel, or if
 
                    // a component is telling us that the channel is (now)
 
                    // closed.
 
                    if entry.is_closed {
 
                        existing.state = PresenceState::Closed;
 
                    } else if component_id != existing.owner_a && existing.state != PresenceState::Closed {
 
                        existing.state = PresenceState::BothPresent;
 
                    }
 

	
 
                    if existing.owner_a != component_id {
 
                        existing.owner_b = Some(component_id);
 
                    }
 

	
 
                    found = true;
 
                    break;
 
                }
 
            }
 

	
 
            if !found {
 
                self.presence.push(ChannelPresence{
 
                    owner_a: component_id,
 
                    owner_b: None,
 
                    id: entry.channel_id,
 
                    state: if entry.is_closed { PresenceState::Closed } else { PresenceState::OnePresent },
 
                });
 
            }
 
        }
 

	
 
        println!("DEBUGGERINO Presence is now:\n{:#?}", self.presence);
 

	
 
        return self.check_for_global_failure();
 
    }
 

	
 
    fn mark_failure_and_check_for_global_failure(&mut self) -> bool {
 
        self.failure_reported = true;
 
        return self.check_for_global_failure();
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    // TODO: At some point, check if divide and conquer is faster?
 
    fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option<GlobalSolution> {
 
        // Small trivial test necessary (but not sufficient) for a global
 
        // solution
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return None;
 
            }
 
        }
 

	
 
        // Construct initial entry on stack
 
        let mut stack = Vec::with_capacity(self.local.len());
 
        stack.push(CheckEntry{
 
            component_index: initial_component_index,
 
            solution_index: initial_solution_index,
 
            parent_entry_index: 0,
 
            match_index_in_parent: 0,
 
            solution_index_in_parent: 0,
 
        });
 

	
 
        'check_last_stack: loop {
 
            let cur_index = stack.len() - 1;
 
            let cur_entry = &stack[cur_index];
 

	
 
            // Check if the current component is matching with all other entries
 
            let mut all_match = true;
 
            'check_against_existing: for prev_index in 0..cur_index {
 
                let prev_entry = &stack[prev_index];
 
                let prev_component = &self.local[prev_entry.component_index];
 
                let prev_solution = &prev_component.solutions[prev_entry.solution_index];
 

	
 
                for prev_matching_component in &prev_solution.matches {
 
                    if prev_matching_component.target_index == cur_entry.component_index {
 
                        // Previous entry has shared ports with the current
 
                        // entry, so see if we have a composable pair of
 
                        // solutions.
 
                        if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) {
 
                            all_match = false;
 
                            break 'check_against_existing;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            if all_match {
 
                // All components matched until now.
 
                if stack.len() == self.local.len() {
 
                    // We have found a global solution
 
                    break 'check_last_stack;
 
                }
 

	
 
                // Not all components found yet, look for a new one that has not
 
                // yet been added yet.
 
                for (parent_index, parent_entry) in stack.iter().enumerate() {
 
                    let parent_component = &self.local[parent_entry.component_index];
 
                    let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                    for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() {
 
                        if peer_component.match_indices.is_empty() {
 
                            continue;
 
                        }
 

	
 
                        let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index);
 
                        if !already_added {
 
                            // New component to try
 
                            stack.push(CheckEntry{
 
                                component_index: peer_component.target_index,
 
                                solution_index: peer_component.match_indices[0],
 
                                parent_entry_index: parent_index,
 
                                match_index_in_parent: peer_index,
 
                                solution_index_in_parent: 0,
 
                            });
 
                            continue 'check_last_stack;
 
                        }
 
                    }
 
                }
 

	
 
                // Cannot find a peer to add. This is possible if, for example,
 
                // we have a component A which has the only connection to
 
                // component B. And B has sent a local solution saying it is
 
                // finished, but the last data message has not yet arrived at A.
 

	
 
                // In any case, we just exit the if statement and handle not
 
                // being able to find a new connector as being forced to try a
 
                // new permutation of possible local solutions.
 
            }
 

	
 
            // Either the currently considered local solution is inconsistent
 
            // with other local solutions, or we cannot find a new component to
 
            // add. This is where we perform backtracking as long as needed to
 
            // try a new solution.
 
            while stack.len() > 1 {
 
                // Check if our parent has another solution we can try
 
                let cur_index = stack.len() - 1;
 
                let cur_entry = &stack[cur_index];
 

	
 
                let parent_entry = &stack[cur_entry.parent_entry_index];
 
                let parent_component = &self.local[parent_entry.component_index];
 
                let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                let match_component = &parent_solution.matches[cur_entry.match_index_in_parent];
 
                debug_assert!(match_component.target_index == cur_entry.component_index);
 
                let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1;
 

	
 
                if new_solution_index_in_parent < match_component.match_indices.len() {
 
                    // We can still try a new one
 
                    let new_solution_index = match_component.match_indices[new_solution_index_in_parent];
 
                    let cur_entry = &mut stack[cur_index];
 
                    cur_entry.solution_index_in_parent = new_solution_index_in_parent;
 
                    cur_entry.solution_index = new_solution_index;
 
                    continue 'check_last_stack;
 
                } else {
 
                    // We're out of options here. So pop an entry, then in
 
                    // the next iteration of this backtracking loop we try
 
                    // to increment that solution
 
                    stack.pop();
 
                }
 
            }
 

	
 
            // Stack length is 1, hence we're back at our initial solution.
 
            // Since that doesn't yield a global solution, we simply:
 
            return None;
 
        }
 

	
 
        // Constructing the representation of the global solution
 
        debug_assert_eq!(stack.len(), self.local.len());
 
        let mut final_branches = Vec::with_capacity(stack.len());
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 
            final_branches.push((component.component, solution.final_branch_id, component.sync_round));
 
        }
 

	
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_channels = 0;
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            total_num_channels += component.solutions[0].channel_mapping.len();
 
        }
 

	
 
        total_num_channels /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_channels);
 
        let mut total_num_checked = 0;
 

	
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 

	
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(*encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
 
                        final_mapping.push((channel_id, branch_id));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        debug_assert_eq!(total_num_checked, total_num_channels);
 

	
 
        return Some(GlobalSolution{
 
            component_branches: final_branches,
 
            channel_mapping: final_mapping,
 
        });
 
    }
 

	
 
    /// Checks if all preconditions for global sync failure have been met
 
    fn check_for_global_failure(&self) -> bool {
 
        if !self.failure_reported {
 
            return false;
 
        }
 

	
 
        // Failure is reported, if all components are present then we may emit
 
        // the global failure broadcast
 
        // Check if all are present and we're preparing to fail this round
 
        let mut all_present = true;
 
        for presence in &self.presence {
 
            if presence.state == PresenceState::OnePresent {
 
                all_present = false;
 
                break;
 
            }
 
        }
 

	
 
        return all_present; // && failure_reported, which is checked above
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution into a structure
 
    /// that can be forwarded to a new parent. The new parent may then merge
 
    /// already obtained information.
 
    fn drain(&mut self) -> Option<SolutionCombiner> {
 
        if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported {
 
            return None;
 
        }
 

	
 
        let result = SolutionCombiner{
 
            local: self.local.clone(),
 
            presence: self.presence.clone(),
 
            failure_reported: self.failure_reported,
 
        };
 

	
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
        return Some(result);
 
    }
 

	
 
    // TODO: Entire routine is quite wasteful. Combine instead of doing all work
 
    //  again.
 
    fn combine(&mut self, combiner: SolutionCombiner) -> Option<LeaderConclusion> {
 
        self.failure_reported = self.failure_reported || combiner.failure_reported;
 

	
 
        // Handle local solutions
 
        if self.local.is_empty() {
 
            // Trivial case
 
            self.local = combiner.local;
 
        } else {
 
            for local in combiner.local {
 
                for matched in local.solutions {
 
                    let local_solution = LocalSolution{
 
                        component: local.component,
 
                        sync_round_number: local.sync_round,
 
                        final_branch_id: matched.final_branch_id,
 
                        port_mapping: matched.channel_mapping,
 
                    };
 
                    let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution);
 
                    if let Some(global_solution) = maybe_solution {
 
                        return Some(LeaderConclusion::Solution(global_solution));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Handle channel presence
 
        println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence);
 
        if self.presence.is_empty() {
 
            // Trivial case
 
            self.presence = combiner.presence;
 
            println!("DEBUGERINO: Trivial merging")
 
        } else {
 
            for presence in combiner.presence {
 
                match self.presence.iter_mut().find(|v| v.id == presence.id) {
 
                    Some(entry) => {
 
                        // Combine entries. Take first that has Closed, then
 
                        // check first that has both, then check if they are
 
                        // combinable
 
                        if entry.state == PresenceState::Closed {
 
                            // Do nothing
 
                        } else if presence.state == PresenceState::Closed {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::Closed;
 
                        } else if entry.state == PresenceState::BothPresent {
 
                            // Again: do nothing
 
                        } else if presence.state == PresenceState::BothPresent {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::BothPresent;
 
                        } else {
 
                            // Both have one presence, combine into both present
 
                            debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent);
 
                            entry.owner_b = Some(presence.owner_a);
 
                            entry.state = PresenceState::BothPresent;
 
                        }
 
                    },
 
                    None => {
 
                        self.presence.push(presence);
 
                    }
 
                }
 
            }
 
            println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence);
 

	
 
            // After adding everything we might have immediately found a solution
 
            if self.check_for_global_failure() {
 
                println!("DEBUG: Returning immediate failure?");
 
                return Some(LeaderConclusion::Failure);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic Helpers
 
// -----------------------------------------------------------------------------
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod branch;
 
mod native;
 
mod port;
 
mod scheduler;
 
mod consensus;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 
mod connector;
 

	
 
// Imports
 

	
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Condvar, Mutex, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use inbox::Message;
 
use port::{ChannelId, Port, PortState};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
#[derive(Debug)]
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
    pub generation: u32,
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId{
 
            index: self.index,
 
            generation: self.generation,
 
        };
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{
 
            index: id.index,
 
            generation: id.generation,
 
        };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone)]
 
pub struct ConnectorId{
 
    pub index: u32,
 
    pub generation: u32,
 
}
 

	
 
impl PartialEq for ConnectorId {
 
    fn eq(&self, other: &Self) -> bool {
 
        return self.index.eq(&other.index);
 
    }
 
}
 

	
 
impl Eq for ConnectorId{}
 

	
 
impl PartialOrd for ConnectorId{
 
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 
        return self.index.partial_cmp(&other.index)
 
    }
 
}
 

	
 
impl Ord for ConnectorId{
 
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 
        return self.partial_cmp(other).unwrap();
 
    }
 
}
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId {
 
            index: u32::MAX,
 
            generation: 0,
 
        };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub(crate) enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub ctx: ComponentCtx,
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            protocol_description,
 
            port_counter: AtomicU32::new(0),
 
            connectors: RwLock::new(ConnectorStore::with_capacity(32)),
 
            connector_queue: Mutex::new(VecDeque::with_capacity(32)),
 
            schedulers: Mutex::new(Vec::new()),
 
            scheduler_notifier: Condvar::new(),
 
            active_connectors: AtomicU32::new(0),
 
            active_interfaces: AtomicU32::new(1), // this `Runtime` instance
 
            should_exit: AtomicBool::new(false),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads as usize);
 
            for thread_index in 0..num_threads {
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::Builder::new()
 
                    .name(format!("thread-{}", thread_index))
 
                    .spawn(move || {
 
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
 
                        scheduler.run();
 
                    })
 
                    .unwrap();
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        self.inner.increment_active_interfaces();
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector_key = self.inner.create_interface_component(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_interfaces();
 
        let mut lock = self.inner.schedulers.lock().unwrap();
 
        for handle in lock.drain(..) {
 
            handle.join().unwrap();
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// RuntimeInner
 
// -----------------------------------------------------------------------------
 

	
 
pub(crate) struct RuntimeInner {
 
    // Protocol
 
    pub(crate) protocol_description: ProtocolDescription,
 
    // Regular counter for port IDs
 
    port_counter: AtomicU32,
 
    // Storage of connectors and the work queue
 
    connectors: RwLock<ConnectorStore>,
 
    connector_queue: Mutex<VecDeque<ConnectorKey>>,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>,
 
    // Conditions to determine whether the runtime can exit
 
    scheduler_notifier: Condvar,  // coupled to mutex on `connector_queue`.
 
    // TODO: Figure out if we can simply merge the counters?
 
    active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active)
 
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
 
    should_exit: AtomicBool,
 
}
 

	
 
impl RuntimeInner {
 
    // --- Managing the components queued for execution
 

	
 
    /// Wait until there is a connector to run. If there is one, then `Some`
 
    /// will be returned. If there is no more work, then `None` will be
 
    /// returned.
 
    pub(crate) fn wait_for_work(&self) -> Option<ConnectorKey> {
 
        let mut lock = self.connector_queue.lock().unwrap();
 
        while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) {
 
            lock = self.scheduler_notifier.wait(lock).unwrap();
 
        }
 

	
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn push_work(&self, key: ConnectorKey) {
 
        let mut lock = self.connector_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.scheduler_notifier.notify_one();
 
    }
 

	
 
    // --- Creating/using ports
 

	
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let channel_id = ChannelId::new(getter_id);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            channel_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            channel_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    /// Sends a message directly (without going through the port) to a
 
    /// component. This is slightly less efficient then sending over a port, but
 
    /// might be preferable for some algorithms. If the component was sleeping
 
    /// then it is scheduled for execution.
 
    pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool {
 
        let target = {
 
            let mut lock = self.connectors.read().unwrap();
 
            let lock = self.connectors.read().unwrap();
 
            lock.get(target_id.index)
 
        };
 

	
 
        // Do a CAS on the number of users. Most common case the component is
 
        // alive and we're the only one sending the message. Note that if we
 
        // finish this block, we're sure that no-one has set the `num_users`
 
        // value to 0. This is essential! When at 0, the component is added to
 
        // the freelist and the generation counter will be incremented.
 
        let mut cur_num_users = 1;
 
        while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) {
 
            if old_num_users == 0 {
 
                // Cannot send message. Whatever the component state is
 
                // (destroyed, at a different generation number, busy being
 
                // destroyed, etc.) we cannot send the message and will not
 
                // modify the component
 
                return false;
 
            }
 

	
 
            cur_num_users = old_num_users;
 
        }
 

	
 
        // We incremented the counter. But we might still be at the wrong
 
        // generation number. The generation number is a monotonically
 
        // increasing value. Since it only increases when someone gets the
 
        // `num_users` counter to 0, we can simply load the generation number.
 
        let generation = target.generation.load(Ordering::Acquire);
 
        if generation != target_id.generation {
 
            // We're at the wrong generation, so we cannot send the message.
 
            // However, since we incremented the `num_users` counter, the moment
 
            // we decrement it we might be the one that are supposed to handle
 
            // the destruction of the component. Note that all users of the
 
            // component do an increment-followed-by-decrement, we can simply
 
            // do a `fetch_sub`.
 
            let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst);
 
            if old_num_users == 1 {
 
                // We're the one that got the counter to 0, so we're the ones
 
                // that are supposed to handle component exit
 
                self.finish_component_destruction(target_id);
 
            }
 

	
 
            return false;
 
        }
 

	
 
        // The generation is correct, and since we incremented the `num_users`
 
        // counter we're now sure that we can send the message and it will be
 
        // handled by the receiver
 
        target.connector.public.inbox.insert_message(message);
 

	
 
        // Finally, do the same as above: decrement number of users, if at gets
 
        // to 0 we're the ones who should handle the exit condition.
 
        let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst);
 
        if old_num_users == 1 {
 
            // We're allowed to destroy the component.
 
            self.finish_component_destruction(target_id);
 
        } else {
 
            // Message is sent. If the component is sleeping, then we're sure
 
            // it is not scheduled and it has not initiated the destruction of
 
            // the component (because of the way
 
            // `initiate_component_destruction` does not set sleeping to true).
 
            // So we can safely schedule it.
 
            let should_wake_up = target.connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up {
 
                let key = unsafe{ ConnectorKey::from_id(target_id) };
 
                self.push_work(key);
 
            }
 
        }
 

	
 
        return true
 
    }
 

	
 
    /// Sends a message to a particular component, assumed to occur over a port.
 
    /// If the component happened to be sleeping then it will be scheduled for
 
    /// execution. Because of the port management system we may assumed that
 
    /// we're always accessing the component at the right generation number.
 
    pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) {
 
        let target = {
 
            let lock = self.connectors.read().unwrap();
 
            let entry = lock.get(target_id.index);
 
            debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation);
 
            &mut entry.connector.public
 
        };
 

	
 
        target.inbox.insert_message(message);
 

	
 
        let should_wake_up = target.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(target_id) };
 
            self.push_work(key);
 
        }
 
    }
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    /// Creates an initially sleeping application connector.
 
    fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
        // Initialize as sleeping, as it will be scheduled by the programmer.
 
        let mut lock = self.connectors.write().unwrap();
 
        let key = lock.create(ConnectorVariant::Native(Box::new(component)), true);
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 

	
 
    /// Creates a new PDL component. This function just creates the component.
 
    /// If you create it initially awake, then you must add it to the work
 
    /// queue. Other aspects of correctness (i.e. setting initial ports) are
 
    /// relinquished to the caller!
 
    pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey {
 
        // Create as not sleeping, as we'll schedule it immediately
 
        let key = {
 
            let mut lock = self.connectors.write().unwrap();
 
            lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping)
 
        };
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 

	
 
    /// Retrieve private access to the component through its key.
 
    #[inline]
 
    pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let entry = {
 
            let lock = self.connectors.read().unwrap();
 
            lock.get(connector_key.index)
 
        };
 

	
 
        debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key);
 
        return &mut entry.connector;
 
    }
 

	
 
    // --- Managing component destruction
 

	
 
    /// Start component destruction, may only be done by the scheduler that is
 
    /// executing the component. This might not actually destroy the component,
 
    /// since other components might be sending it messages.
 
    fn initiate_component_destruction(&self, connector_key: ConnectorKey) {
 
        // Most of the time no-one will be sending messages, so try
 
        // immediate destruction
 
        let mut lock = self.connectors.write().unwrap();
 
        let entry = lock.get(connector_key.index);
 
        debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation);
 
        debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component
 
        let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst);
 
        if old_num_users == 1 {
 
            // We just brought the number of users down to 0. Destroy the
 
            // component
 
            entry.connector.public.inbox.clear();
 
            entry.generation.fetch_add(1, Ordering::SeqCst);
 
            lock.destroy(connector_key);
 
            self.decrement_active_components();
 
        }
 
    }
 

	
 
    fn finish_component_destruction(&self, connector_id: ConnectorId) {
 
        let mut lock = self.connectors.write().unwrap();
 
        let entry = lock.get(connector_id.index);
 
        debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0);
 
        let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_eq!(_old_generation, connector_id.generation);
 

	
 
        // TODO: In the future we should not only clear out the inbox, but send
 
        //  messages back to the senders indicating the messages did not arrive.
 
        entry.connector.public.inbox.clear();
 

	
 
        // Invariant of only one thread being able to handle the internals of
 
        // component is preserved by the fact that only one thread can decrement
 
        // `num_users` to 0.
 
        lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) });
 
        self.decrement_active_components();
 
    }
 

	
 
    // --- Managing exit condition
 

	
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that active interfaces is now 0
 
            let num_connectors = self.active_connectors.load(Ordering::Acquire);
 
            if num_connectors == 0 {
 
                self.signal_for_shutdown();
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst);
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that we have no more active connectors (for now!)
 
            let num_interfaces = self.active_interfaces.load(Ordering::Acquire);
 
            if num_interfaces == 0 {
 
                self.signal_for_shutdown();
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn signal_for_shutdown(&self) {
 
        debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0);
 
        debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0);
 

	
 
        let _lock = self.connector_queue.lock().unwrap();
 
        let should_signal = self.should_exit
 
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_signal {
 
            self.scheduler_notifier.notify_all();
 
        }
 
    }
 
}
 

	
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorStore
 
// -----------------------------------------------------------------------------
 

	
 
struct StoreEntry {
 
    connector: ScheduledConnector,
 
    generation: std::sync::atomic::AtomicU32,
 
    num_users: std::sync::atomic::AtomicU32,
 
}
 

	
 
struct ConnectorStore {
 
    // Freelist storage of connectors. Storage should be pointer-stable as
 
    // someone might be mutating the vector while we're executing one of the
 
    // connectors.
 
    entries: RawVec<*mut StoreEntry>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self {
 
            entries: RawVec::with_capacity(capacity),
 
            free: Vec::with_capacity(capacity),
 
        }
 
    }
 

	
 
    /// Directly retrieves an entry. There be dragons here. The `connector`
 
    /// might have its destructor already executed. Accessing it might then lead
 
    /// to memory corruption.
 
    fn get(&self, index: u32) -> &'static mut StoreEntry {
 
        unsafe {
 
            let entry = self.entries.get_mut(index as usize);
 
            return &mut **entry;
 
        }
 
    }
 

	
 
    /// Creates a new connector. Caller should ensure ports are set up correctly
 
    /// and the connector is queued for execution if needed.
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.entries.len();
 
            key = ConnectorKey{
 
                index: index as u32, generation: 0
 
            };
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(StoreEntry{
 
                connector,
 
                generation: AtomicU32::new(0),
 
                num_users: AtomicU32::new(1),
 
            }));
 
            self.entries.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = &mut **self.entries.get_mut(index);
 
                std::ptr::write(&mut target.connector as *mut _, connector);
 
                let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst);
 
                debug_assert_eq!(_old_num_users, 0);
 

	
 
                let generation = target.generation.load(Ordering::Acquire);
 
                key = ConnectorKey{ index: index as u32, generation };
 
                target.connector.ctx.id = key.downcast();
 
            }
 
        }
 

	
 
        println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        return key;
 
    }
 

	
 
    /// Destroys a connector. Caller should make sure it is not scheduled for
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let target = self.entries.get_mut(key.index as usize);
 
            (**target).generation.fetch_add(1, Ordering::SeqCst);
 
            std::ptr::drop_in_place(*target);
 
            // Note: but not deallocating!
 
        }
 

	
 
        println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        self.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        // Everything in the freelist already had its destructor called, so only
 
        // has to be deallocated
 
        for free_idx in self.free.iter().copied() {
 
            unsafe {
 
                let memory = self.entries.get_mut(free_idx);
 
                let layout = std::alloc::Layout::for_value(&**memory);
 
                std::alloc::dealloc(*memory as *mut u8, layout);
 

	
 
                // mark as null for the remainder
 
                *memory = std::ptr::null_mut();
 
            }
 
        }
 

	
 
        // With the deallocated stuff marked as null, clear the remainder that
 
        // is not null
 
        for idx in 0..self.entries.len() {
 
            unsafe {
 
                let memory = *self.entries.get_mut(idx);
 
                if !memory.is_null() {
 
                    let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime::consensus::RoundConclusion;
 

	
 
use super::{ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{
 
    Message, DataMessage,
 
    SyncCompMessage, SyncPortMessage,
 
    ControlContent, ControlMessage
 
};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    success: bool,
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
type SyncDone = Arc<(Mutex<Option<FinishedSync>>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    SyncRound(Vec<ApplicationSyncAction>),
 
    Shutdown,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorApplication
 
// -----------------------------------------------------------------------------
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
// TODO: Strong candidate for logic reduction in handling put/get. A lot of code
 
//  is an approximate copy-pasta from the regular component logic. I'm going to
 
//  wait until I'm implementing more native components to see which logic is
 
//  truly common.
 
pub struct ConnectorApplication {
 
    // Communicating about new jobs and setting up sync rounds
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    is_in_sync: bool,
 
    // Handling current sync round
 
    sync_desc: Vec<ApplicationSyncAction>,
 
    tree: FakeTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
    branch_extra: Vec<usize>, // instruction counter per branch
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if self.is_in_sync {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_conclusion(conclusion, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            return self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(None), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone(),
 
            is_in_sync: false,
 
            sync_desc: Vec::new(),
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 

	
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(ticket) = comp_ctx.get_next_message_ticket() {
 
            let message = comp_ctx.read_message_using_ticket(ticket);
 
            if let Message::Data(_) = message {
 
                self.handle_new_data_message(ticket, comp_ctx)
 
            } else {
 
                match comp_ctx.take_message_using_ticket(ticket) {
 
                    Message::Data(message) => unreachable!(),
 
                    Message::Data(_message) => unreachable!(),
 
                    Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx),
 
                    Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx),
 
                    Message::SyncControl(message) => todo!("implement"),
 
                    Message::SyncControl(_message) => todo!("implement"),
 
                    Message::Control(_) => unreachable!("control message in native API component"),
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(ticket, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            let message = ctx.read_message_using_ticket(ticket).as_data();
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) {
 
        if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            self.collapse_sync_to_conclusion(conclusion, ctx);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.handle_new_messages(comp_ctx);
 

	
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 
        let mut instruction_idx = self.branch_extra[branch_id.index as usize];
 
        let instruction_idx = self.branch_extra[branch_id.index as usize];
 

	
 
        if instruction_idx >= self.sync_desc.len() {
 
            // Performed last instruction, so this branch is officially at the
 
            // end of the synchronous interaction.
 
            let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
            if consistency == Consistency::Valid {
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
            } else {
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            }
 
        } else {
 
            // We still have instructions to perform
 
            let cur_instruction = &self.sync_desc[instruction_idx];
 
            self.branch_extra[branch_id.index as usize] += 1;
 

	
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 

	
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: content.clone(),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 

	
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    }
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                ApplicationJob::SyncRound(description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) {
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 

	
 
        let (branch_id, success) = match conclusion {
 
            RoundConclusion::Success(branch_id) => {
 
                debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API
 
                (branch_id, true)
 
            },
 
            RoundConclusion::Failure => (BranchId::new_invalid(), false),
 
        };
 

	
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 
        debug_assert!(fake_vec.is_empty());
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
        // Turning hashmapped inbox into vector of values
 
        let mut inbox = Vec::with_capacity(solution_branch.inbox.len());
 
        for action in &self.sync_desc {
 
            match action {
 
                ApplicationSyncAction::Put(_, _) => {},
 
                ApplicationSyncAction::Get(port_id) => {
 
                    debug_assert!(solution_branch.inbox.contains_key(port_id));
 
                    inbox.push(solution_branch.inbox.remove(port_id).unwrap());
 
                },
 
            }
 
        }
 

	
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ success, inbox });
 
        notification.notify_one();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ApplicationInterface
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub enum ChannelCreationError {
 
    InSync,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationStartSyncError {
 
    AlreadyInSync,
 
    NoSyncActions,
 
    IncorrectPortKind,
 
    UnownedPort,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
    Failure,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
    Put(PortIdLocal, ValueGroup),
 
    Get(PortIdLocal),
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    is_in_sync: bool,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<(PortKind, PortIdLocal)>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            is_in_sync: false,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel. Can only fail if the application interface is
 
    /// currently in sync mode.
 
    pub fn create_channel(&mut self) -> Result<Channel, ChannelCreationError> {
 
        if self.is_in_sync {
 
            return Err(ChannelCreationError::InSync);
 
        }
 

	
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push((PortKind::Putter, putter_id));
 
        self.owned_ports.push((PortKind::Getter, getter_id));
 

	
 
        return Ok(Channel{ putter_id, getter_id });
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        if self.is_in_sync {
 
            return Err(ComponentCreationError::InSync);
 
        }
 

	
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|(_, v)| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let prompt = self.runtime.protocol_description.new_component(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(prompt);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Queues up a description of a synchronous round to run. Will not actually
 
    /// run the synchronous behaviour in blocking fashion. The results *must* be
 
    /// retrieved using `try_wait` or `wait` for the interface to be considered
 
    /// in non-sync mode.
 
    pub fn perform_sync_round(&mut self, actions: Vec<ApplicationSyncAction>) -> Result<(), ApplicationStartSyncError> {
 
        if self.is_in_sync {
 
            return Err(ApplicationStartSyncError::AlreadyInSync);
 
        }
 

	
 
        // Check the action ports for consistency
 
        for action in &actions {
 
            let (port_id, expected_kind) = match action {
 
                ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter),
 
                ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter),
 
            };
 

	
 
            match self.find_port_by_id(port_id) {
 
                Some(port_kind) => {
 
                    if port_kind != expected_kind {
 
                        return Err(ApplicationStartSyncError::IncorrectPortKind)
 
                    }
 
                },
 
                None => {
 
                    return Err(ApplicationStartSyncError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        // Everything is consistent, go into sync mode and send the actions off
 
        // to the component that will actually perform the sync round
 
        self.is_in_sync = true;
 
        {
 
            let (is_done, _) = &*self.sync_done;
 
            let mut lock = is_done.lock().unwrap();
 
            *lock = None;
 
        }
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::SyncRound(actions));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        return Ok(())
 
    }
 

	
 
    /// Wait until the next sync-round is finished, returning the received
 
    /// messages in order of `get` calls.
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 

	
 
        let (is_done, condition) = &*self.sync_done;
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        let result = lock.take().unwrap();
 
        if result.success {
 
            return Ok(result.inbox);
 
        } else {
 
            return Err(ApplicationEndSyncError::Failure);
 
        }
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let message = ControlMessage {
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ping,
 
        };
 
        self.runtime.send_message_maybe_destroyed(self.connector_id, Message::Control(message));
 
    }
 

	
 
    fn find_port_by_id(&self, port_id: PortIdLocal) -> Option<PortKind> {
 
        return self.owned_ports.iter()
 
            .find(|(_, owned_id)| *owned_id == port_id)
 
            .map(|(port_kind, _)| *port_kind);
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::Shutdown);
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        self.runtime.decrement_active_interfaces();
 
    }
 
}
 
\ No newline at end of file
src/runtime/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use crate::protocol::eval::EvalError;
 
use crate::runtime::port::ChannelId;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::branch::{BranchId};
 
use super::connector::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox::{
 
    Message, DataMessage,
 
    ControlMessage, ControlContent,
 
    SyncControlMessage, SyncControlContent,
 
};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                self.debug(" ... No more work, quitting");
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.initiate_component_destruction(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message));
 
                            self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.initiate_component_destruction(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 

	
 
                    // We insert directly into the private inbox. Since we have
 
                    // a reroute entry the component can not yet be running.
 
                    if let Message::Control(_) = &message {
 
                        self.runtime.send_message_assumed_alive(other_component_id, message);
 
                    } else {
 
                        let key = unsafe { ConnectorKey::from_id(other_component_id) };
 
                        let component = self.runtime.get_component_private(&key);
 
                        component.ctx.inbox.insert_new(message);
 
                    }
 

	
 
                    continue;
 
                }
 

	
 
                match scheduled.ctx.get_port_by_id(target_port) {
 
                    Some(port_info) => {
 
                        if port_info.state == PortState::Closed {
 
                            // We're no longer supposed to receive messages
 
                            // (rerouted message arrived much later!)
 
                            continue
 
                        }
 
                    },
 
                    None => {
 
                        // Apparently we no longer have a handle to the port
 
                        continue;
 
                    }
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.peer_connector = new_target_connector_id;
 

	
 
                        // Note: for simplicity we program the scheduler to always finish
 
                        // running a connector with an empty outbox. If this ever changes
 
                        // then accepting the "port peer changed" message implies we need
 
                        // to change the recipient of the message in the outbox.
 
                        debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                        // And respond with an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::CloseChannel(port_id) => {
 
                        // Mark the port as being closed
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.state = PortState::Closed;
 

	
 
                        // Send an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some(component_key) = scheduled.router.handle_ack(message.id) {
 
                            self.runtime.push_work(component_key);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            if port.state == PortState::Open {
 
                let message = SyncControlMessage {
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message));
 
                self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message));
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            let (target_component_id, over_port) = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 
                    debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context
 

	
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    (content.target_component_id, false)
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context
 

	
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"),
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message));
 
            if over_port {
 
                self.runtime.send_message_assumed_alive(target_component_id, message);
 
            } else {
 
                self.runtime.send_message_maybe_destroyed(target_component_id, message);
 
            }
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. Need to relinquish control of
 
                    // the ports.
 
                    let new_component_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_component_key);
 

	
 
                    // First pass: transfer ports and the associated messages,
 
                    // also count the number of ports that have peers
 
                    let mut num_peers = 0;
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox);
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        if port.state == PortState::Open {
 
                            num_peers += 1;
 
                        }
 
                    }
 

	
 
                    if num_peers == 0 {
 
                        // No peers to notify, so just schedule the component
 
                        self.runtime.push_work(new_component_key);
 
                    } else {
 
                        // Some peers to notify
 
                        let new_component_id = new_component_key.downcast();
 
                        let control_id = scheduled.router.prepare_new_component(new_component_key);
 
                        for port in new_connector.ctx.ports.iter() {
 
                            if port.state == PortState::Closed {
 
                                continue;
 
                            }
 

	
 
                            let control_message = scheduled.router.prepare_changed_port_peer(
 
                                control_id, scheduled.ctx.id,
 
                                port.peer_connector, port.peer_id,
 
                                new_component_id, port.self_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message));
 
                            self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message));
 
                        }
 
                    }
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So prepare inbox for the next sync
 
                // round
 
                scheduled.ctx.inbox.clear_read_messages();
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.index);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.index, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox: Inbox,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox: Inbox::new(),
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify the runtime of an error. Note that this will not perform any
 
    /// special action beyond printing the error. The component is responsible
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.index, error);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.channel_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            let port_info = self.get_port_by_id(port_id);
 
            let is_valid = match port_info {
 
                Some(port_info) => {
 
                    port_info.state == PortState::Open
 
                },
 
                None => false,
 
            };
 
            if !is_valid {
 
                // We don't own the port
 
                println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index);
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return self.inbox.get_read_data_messages(match_port_id);
 
    }
 

	
 
    pub(crate) fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if !self.is_in_sync { return None; }
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option<MessageTicket> {
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        return self.inbox.read_message_using_ticket(ticket);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        return self.inbox.take_message_using_ticket(ticket)
 
    }
 

	
 
    /// Puts back a message back into the inbox. The reason being that the
 
    /// message is actually part of the next sync round. This will
 
    pub(crate) fn put_back_message(&mut self, message: Message) {
 
        self.inbox.put_back_message(message);
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
}
 

	
 
impl<'a> Iterator for MessagesIter<'a> {
 
    type Item = &'a DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if let Message::Data(message) = &message {
 
                if message.data_header.target_port == self.match_port_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(message);
 
                }
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Private Inbox
 
// -----------------------------------------------------------------------------
 

	
 
/// A structure that contains inbox messages. Some messages are left inside and
 
/// continuously re-read. Others are taken out, but may potentially be put back
 
/// for later reading. Later reading in this case implies that they are put back
 
/// for reading in the next sync round.
 
/// TODO: Again, lazy concurrency, see git history for other implementation
 
struct Inbox {
 
    messages: Vec<Message>,
 
    delayed: Vec<Message>,
 
    next_read_idx: u32,
 
    generation: u32,
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub(crate) struct MessageTicket {
 
    index: u32,
 
    generation: u32,
 
}
 

	
 
impl Inbox {
 
    fn new() -> Self {
 
        return Inbox {
 
            messages: Vec::new(),
 
            delayed: Vec::new(),
 
            next_read_idx: 0,
 
            generation: 0,
 
        }
 
    }
 

	
 
    fn insert_new(&mut self, message: Message) {
 
        assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size
 
        self.messages.push(message);
 
    }
 

	
 
    fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if self.next_read_idx as usize >= self.messages.len() { return None };
 
        let idx = self.next_read_idx;
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{ index: idx, generation: self.generation });
 
    }
 

	
 
    fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        return &self.messages[ticket.index as usize];
 
    }
 

	
 
    fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        debug_assert!(ticket.index < self.next_read_idx);
 
        self.next_read_idx -= 1;
 
        return self.messages.remove(ticket.index as usize);
 
    }
 

	
 
    fn put_back_message(&mut self, message: Message) {
 
        // We have space in front of the array because we've taken out a message
 
        // before.
 
        self.delayed.push(message);
 
    }
 

	
 
    fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter{
 
            messages: self.messages.as_slice(),
 
            next_index: 0,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
    }
 

	
 
    fn clear_read_messages(&mut self) {
 
        self.messages.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.delayed.drain(..).enumerate() {
 
            self.messages.insert(idx, v);
 
        }
 
        self.next_read_idx = 0;
 
    }
 

	
 
    fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) {
 
        debug_assert!(self.delayed.is_empty());
 
        let mut idx = 0;
 
        while idx < self.messages.len() {
 
            let msg = &self.messages[idx];
 
            if let Some(target) = msg.target_port() {
 
                if target == port {
 
                    new_inbox.messages.push(self.messages.remove(idx));
 
                    continue;
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    NewComponent(ControlNewComponent),
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
impl ControlVariant {
 
    fn as_new_component_mut(&mut self) -> &mut ControlNewComponent {
 
        match self {
 
            ControlVariant::NewComponent(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
/// Entry for a new component waiting for execution after all of its peers have
 
/// confirmed the `ControlChangedPort` messages.
 
struct ControlNewComponent {
 
    num_acks_pending: u32,          // if it hits 0, we schedule the component
 
    component_key: ConnectorKey,    // this is the component we schedule
 
}
 

	
 
struct ControlChangedPort {
 
    reroute_if_sent_to_this_port: PortIdLocal, // if sent to this port, then reroute
 
    source_connector: ConnectorId,             // connector we expect messages from
 
    target_connector: ConnectorId,             // connector we need to reroute to
 
    new_component_entry_id: u32,               // if Ack'd, we reduce the counter on this `ControlNewComponent` entry
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId
 
    ) -> ControlMessage {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
 
                source_port: self_port_id,
 
                target_port: peer_port_id,
 
            }),
 
        });
 

	
 
        return ControlMessage {
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::CloseChannel(peer_port_id),
 
        };
 
    }
 

	
 
    /// Prepares a control entry for a new component. This returns the id of
 
    /// the entry for calls to `prepare_changed_port_peer`. Don't call this
 
    /// function if the component has no peers that need to be messaged.
 
    pub fn prepare_new_component(&mut self, component_key: ConnectorKey) -> u32 {
 
        let id = self.take_id();
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::NewComponent(ControlNewComponent{
 
                num_acks_pending: 0,
 
                component_key,
 
            }),
 
        });
 

	
 
        return id;
 
    }
 

	
 
    pub fn prepare_changed_port_peer(
 
        &mut self, new_component_entry_id: u32, creating_component_id: ConnectorId,
 
        changed_component_id: ConnectorId, changed_port_id: PortIdLocal,
 
        new_target_component_id: ConnectorId, new_target_port_id: PortIdLocal
 
    ) -> ControlMessage {
 
        // Add the peer-changed entry
 
        let change_port_entry_id = self.take_id();
 
        self.active.push(ControlEntry{
 
            id: change_port_entry_id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                reroute_if_sent_to_this_port: new_target_port_id,
 
                source_connector: changed_component_id,
 
                target_connector: new_target_component_id,
 
                new_component_entry_id,
 
            })
 
        });
 

	
 
        // Increment counter on "new component" entry
 
        let position = self.position(new_component_entry_id).unwrap();
 
        let new_component_entry = &mut self.active[position];
 
        let new_component_entry = new_component_entry.variant.as_new_component_mut();
 
        new_component_entry.num_acks_pending += 1;
 

	
 
        return ControlMessage{
 
            id: change_port_entry_id,
 
            sending_component_id: creating_component_id,
 
            content: ControlContent::PortPeerChanged(changed_port_id, new_target_component_id),
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.reroute_if_sent_to_this_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message.
 
    /// Handling an Ack might spawn a new message that needs to be sent.
 
    pub fn handle_ack(&mut self, id: u32) -> Option<ConnectorKey> {
 
        let index = self.position(id);
 

	
 
        match index {
 
            Some(index) => {
 
                // Remove the entry. If `ChangedPort`, then retrieve associated
 
                // `NewComponent`. Otherwise: early exits
 
                let removed_entry = self.active.remove(index);
 
                let new_component_idx = match removed_entry.variant {
 
                    ControlVariant::ChangedPort(message) => {
 
                        self.position(message.new_component_entry_id).unwrap()
 
                    },
 
                    _ => return None,
 
                };
 

	
 
                // Decrement counter, if 0, then schedule component
 
                let new_component_entry = self.active[new_component_idx].variant.as_new_component_mut();
 
                new_component_entry.num_acks_pending -= 1;
 
                if new_component_entry.num_acks_pending != 0 {
 
                    return None;
 
                }
 

	
 
                // Return component key for scheduling
 
                let new_component_entry = self.active.remove(new_component_idx);
 
                let new_component_entry = match new_component_entry.variant {
 
                    ControlVariant::NewComponent(entry) => entry,
 
                    _ => unreachable!(),
 
                };
 

	
 
                return Some(new_component_entry.component_key);
 
            },
 
            None => {
 
                todo!("handling of nefarious ACKs");
 
                return None;
 
                todo!("handling of nefarious ACKs")
 
                // return None;
 
            },
 
        }
 
    }
 

	
 
    /// Retrieves the number of responses we still expect to receive from our
 
    /// peers
 
    #[inline]
 
    pub fn num_pending_acks(&self) -> usize {
 
        return self.active.len();
 
    }
 

	
 
    fn take_id(&mut self) -> u32 {
 
        let generated_id = self.id_counter;
 
        let (new_id, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id;
 

	
 
        return generated_id;
 
    }
 

	
 
    #[inline]
 
    fn position(&self, id: u32) -> Option<usize> {
 
        return self.active.iter().position(|v| v.id == id);
 
    }
 
}
 
\ No newline at end of file
src/runtime/tests/data_transmission.rs
Show inline comments
 
// basics.rs
 
//
 
// The most basic of testing: sending a message, receiving a message, etc.
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_doing_nothing() {
 
    // If this thing does not get into an infinite loop, (hence: the runtime
 
    // exits), then the test works
 
    const CODE: &'static str ="
 
    primitive silent_willy(u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync { index += 1; }
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("doing_nothing");
 
    let _timer = TestTimer::new("doing_nothing");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "silent_willy", ValueGroup::new_stack(vec![
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create component");
 
    });
 
}
 

	
 
#[test]
 
fn test_single_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                put(sender, true);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter(in<bool> receiver, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto result = get(receiver);
 
                assert(result);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("single_put_and_get");
 
    let _timer = TestTimer::new("single_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 

	
 
        api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create putter");
 

	
 
        api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create getter");
 
    });
 
}
 

	
 
#[test]
 
fn test_combined_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive put_then_get(out<bool> output, in<bool> input, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                put(output, true);
 
                auto value = get(input);
 
                assert(value);
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    composite constructor(u32 num_loops) {
 
        channel output_a -> input_a;
 
        channel output_b -> input_b;
 
        new put_then_get(output_a, input_b, num_loops);
 
        new put_then_get(output_b, input_a, num_loops);
 
    }
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create connector");
 
    })
 
}
 

	
 
#[test]
 
fn test_multi_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter_static(out<u8> vals, u32 num_loops) {
 
        u32 index = 0;
 
        while (index < num_loops) {
 
            sync {
 
                put(vals, 0b00000001);
 
                put(vals, 0b00000100);
 
                put(vals, 0b00010000);
 
                put(vals, 0b01000000);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive getter_dynamic(in<u8> vals, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                u32 recv_index = 0;
 
                u8 expected = 1;
 
                while (recv_index < 4) {
 
                    auto gotten = get(vals);
 
                    assert(gotten == expected);
 
                    expected <<= 2;
 
                    recv_index += 1;
 
                }
 
            }
 
            loop_index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("multi_put_and_get");
 
    let _timer = TestTimer::new("multi_put_and_get");
 
    run_test_in_runtime(CODE, |api| {
 
        let channel = api.create_channel().unwrap();
 
        api.create_connector("", "putter_static", ValueGroup::new_stack(vec![
 
            Value::Output(PortId::new(channel.putter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
        api.create_connector("", "getter_dynamic", ValueGroup::new_stack(vec![
 
            Value::Input(PortId::new(channel.getter_id.index)),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).unwrap();
 
    })
 
}
 
\ No newline at end of file
src/runtime/tests/network_shapes.rs
Show inline comments
 
// Testing particular graph shapes
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_star_shaped_request() {
 
    const CODE: &'static str = "
 
    primitive edge(in<u32> input, out<u32> output, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync {
 
                auto req = get(input);
 
                put(output, req * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            // print(\"starting loop\");
 
            sync {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
        auto responses = {};
 

	
 
        u32 edge_index = 0;
 
        while (edge_index < num_edges) {
 
            channel req_put -> req_get;
 
            channel resp_put -> resp_get;
 
            new edge(req_get, resp_put, num_loops);
 
            requests @= { req_put };
 
            responses @= { resp_get };
 

	
 
            edge_index += 1;
 
        }
 

	
 
        new center(requests, responses, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("star_shaped_request");
 
    let _timer = TestTimer::new("star_shaped_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create connector");
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            sync {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            sync {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    let _timer = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(1),
 
            Value::UInt32(NUM_LOOPS)
 
        ])).expect("create connector");
 
    });
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)