Changeset - 7d01f1245b7c
[Not reviewed]
0 12 0
mh - 4 years ago 2021-10-20 20:26:23
contact@maxhenger.nl
Everything compiles again, pending restructuring of shared runtime objects
12 files changed with 302 insertions and 305 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_vec.rs
Show inline comments
 
@@ -13,112 +13,112 @@ enum AllocError {
 
/// free the backing memory, but will not run any destructors.
 
pub struct RawVec<T: Sized> {
 
    base: *mut T,
 
    cap: usize,
 
    len: usize,
 
}
 

	
 
impl<T: Sized> RawVec<T> {
 
    const T_ALIGNMENT: usize = mem::align_of::<T>();
 
    const T_SIZE: usize = mem::size_of::<T>();
 
    
 
    const GROWTH_RATE: usize = 2;
 

	
 
    pub fn new() -> Self {
 
        Self{
 
            base: ptr::null_mut(),
 
            cap: 0,
 
            len: 0,
 
        }
 
    }
 

	
 
    pub fn with_capacity(capacity: usize) -> Self {
 
        // Could be done a bit more efficiently
 
        let mut result = Self::new();
 
        result.ensure_space(capacity);
 
        result.ensure_space(capacity).unwrap();
 
        return result;
 
    }
 

	
 
    pub unsafe fn get(&self, idx: usize) -> *const T {
 
        debug_assert!(idx < self.len);
 
        return self.base.add(idx);
 
    }
 

	
 
    pub unsafe fn get_mut(&self, idx: usize) -> *mut T {
 
        debug_assert!(idx < self.len);
 
        return self.base.add(idx);
 
    }
 

	
 
    pub fn push(&mut self, item: T) {
 
        self.ensure_space(1);
 
        self.ensure_space(1).unwrap();
 
        unsafe {
 
            let target = self.base.add(self.len);
 
            std::ptr::write(target, item);
 
            self.len += 1;
 
        }
 
    }
 

	
 
    pub fn len(&self) -> usize {
 
        return self.len;
 
    }
 

	
 
    fn ensure_space(&mut self, additional: usize) -> Result<(), AllocError>{
 
        debug_assert!(Self::T_SIZE != 0);
 
        debug_assert!(self.cap >= self.len);
 
        if self.cap - self.len < additional {
 
            // Need to resize. Note that due to all checked conditions we have
 
            // that new_cap >= 1.
 
            debug_assert!(additional > 0);
 
            let new_cap = self.len.checked_add(additional).unwrap();
 
            let new_cap = cmp::max(new_cap, self.cap * Self::GROWTH_RATE);
 
            
 
            let layout = Layout::array::<T>(new_cap)
 
                .map_err(|_| AllocError::CapacityOverflow)?;
 
            debug_assert_eq!(new_cap * Self::T_SIZE, layout.size());
 

	
 
            unsafe {
 
                // Allocate new storage, transfer bits, deallocate old store
 
                let new_base = alloc(layout);
 

	
 
                if self.cap > 0 {
 
                    let old_base = self.base as *mut u8;
 
                    let (old_size, old_layout) = self.current_layout();
 

	
 
                    ptr::copy_nonoverlapping(new_base, old_base, old_size);
 
                    dealloc(old_base, old_layout);
 
                }
 

	
 
                self.base = new_base;
 
                self.base = new_base as *mut T;
 
                self.cap = new_cap;
 
            }
 
        } // else: still enough space
 

	
 
        return Ok(());
 
    }
 

	
 
    #[inline]
 
    fn current_layout(&self) -> (usize, Layout) {
 
        debug_assert!(Self::T_SIZE > 0);
 
        let old_size = self.cap * Self::T_SIZE;
 
        unsafe {
 
            return (
 
                old_size,
 
                Layout::from_size_align_unchecked(old_size, Self::T_ALIGNMENT)
 
            );
 
        }
 
    }
 
}
 

	
 
impl<T: Sized> Drop for RawVec<T> {
 
    fn drop(&mut self) {
 
        if self.cap > 0 {
 
            debug_assert!(!self.base.is_null());
 
            let (_, layout) = self.current_layout();
 
            unsafe {
 
                dealloc(self.base, layout);
 
                dealloc(self.base as *mut u8, layout);
 
                if cfg!(debug_assertions) {
 
                    self.base = ptr::null_mut();
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -1098,49 +1098,48 @@ impl TypeTable {
 
    /// resolvable. If so then the appropriate union variants will be marked as
 
    /// "living on heap". If not then a `ParseError` will be returned
 
    fn detect_and_resolve_type_loops_for(&mut self, modules: &[Module], heap: &Heap, concrete_type: ConcreteType) -> Result<(), ParseError> {
 
        use DefinedTypeVariant as DTV;
 

	
 
        debug_assert!(self.type_loop_breadcrumbs.is_empty());
 
        debug_assert!(self.type_loops.is_empty());
 
        debug_assert!(self.encountered_types.is_empty());
 

	
 
        // Push the initial breadcrumb
 
        let initial_breadcrumb = self.check_member_for_type_loops(&concrete_type);
 
        if let TypeLoopResult::PushBreadcrumb(definition_id, concrete_type) = initial_breadcrumb {
 
            self.handle_new_breadcrumb_for_type_loops(definition_id, concrete_type);
 
        } else {
 
            unreachable!();
 
        }
 

	
 
        // Enter into the main resolving loop
 
        while !self.type_loop_breadcrumbs.is_empty() {
 
            // Because we might be modifying the breadcrumb array we need to
 
            let breadcrumb_idx = self.type_loop_breadcrumbs.len() - 1;
 
            let mut breadcrumb = self.type_loop_breadcrumbs[breadcrumb_idx].clone();
 

	
 
            let poly_type = self.lookup.get(&breadcrumb.definition_id).unwrap();
 
            let poly_type_definition_id = poly_type.ast_definition;
 

	
 
            let resolve_result = match &poly_type.definition {
 
                DTV::Enum(_) => {
 
                    TypeLoopResult::TypeExists
 
                },
 
                DTV::Union(definition) => {
 
                    let monomorph = &definition.monomorphs[breadcrumb.monomorph_idx];
 
                    let num_variants = monomorph.variants.len();
 

	
 
                    let mut union_result = TypeLoopResult::TypeExists;
 

	
 
                    'member_loop: while breadcrumb.next_member < num_variants {
 
                        let mono_variant = &monomorph.variants[breadcrumb.next_member];
 
                        let num_embedded = mono_variant.embedded.len();
 

	
 
                        while breadcrumb.next_embedded < num_embedded {
 
                            let mono_embedded = &mono_variant.embedded[breadcrumb.next_embedded];
 
                            union_result = self.check_member_for_type_loops(&mono_embedded.concrete_type);
 

	
 
                            if union_result != TypeLoopResult::TypeExists {
 
                                // In type loop or new breadcrumb pushed, so
 
                                // break out of the resolving loop
 
                                break 'member_loop;
 
                            }
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{MessageContents, SolutionMessage};
 
use crate::runtime2::inbox::{Message, MessageContents, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
pub struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
@@ -124,92 +124,94 @@ impl PortAssignment {
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
#[derive(Debug)]
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// Contains a description of the port mapping during a particular sync session.
 
/// TODO: Extend documentation
 
pub(crate) struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    pub owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    pub port_mapping: Vec<PortAssignment>
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
            let parent_port = parent_port.clone();
 
            self.port_mapping.push(parent_port);
 
        }
 
    }
 

	
 
    /// Adds a new port. Caller must make sure that the connector is not in the
 
    /// sync phase.
 
    fn add_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len());
 
        debug_assert!(!self.owned_ports.contains(&port_id));
 
        self.owned_ports.push(port_id);
 
        self.port_mapping.push(PortAssignment::new_unassigned());
 
    }
 

	
 
    /// Commits to a particular branch. Essentially just removes the port
 
    /// mapping information generated during the sync phase.
 
    fn commit_to_sync(&mut self) {
 
        self.port_mapping.truncate(self.owned_ports.len());
 
        debug_assert!(self.port_mapping.iter().all(|v| {
 
            !v.is_assigned && !v.last_registered_branch_id.is_valid()
 
        }));
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
@@ -282,102 +284,102 @@ impl BranchQueue {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 

	
 
    #[inline]
 
    fn clear(&mut self) {
 
        self.first = 0;
 
        self.last = 0;
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(false),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32, // TODO: Change to BranchId?
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message {
 
            MC::Data(message) => self.handle_data_message(message),
 
            MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state),
 
            MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state),
 
            MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state),
 
        match message.contents {
 
            MC::Data(content) => self.handle_data_message(message.receiving_port, content),
 
            MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state),
 
            MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state),
 
            MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
@@ -425,50 +427,50 @@ impl ConnectorPDL {
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            cur_round: 0,
 
            committed_to: None,
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        self.inbox.insert_message(target_port, message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 

	
 
            // Note that we only iterate over the solutions we've already
 
            // handled ourselves, not necessarily
 
            let mut branch_index = self.sync_finished.first;
 
            'branch_loop: loop {
 
@@ -556,101 +558,102 @@ impl ConnectorPDL {
 
                        let port = ctx.get_port(port_id);
 
                        (port.peer_connector, port.peer_id, port.kind == PortKind::Putter)
 
                    };
 

	
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    let constraint = if mapping.num_times_fired == 0 {
 
                        SyncBranchConstraint::SilentPort(peer_port_id)
 
                    } else {
 
                        if peer_is_getter {
 
                            SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id)
 
                        } else {
 
                            SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id)
 
                        }
 
                    };
 

	
 
                    match new_solution.add_or_check_constraint(peer_connector_id, constraint) {
 
                        Err(_) => continue 'branch_loop,
 
                        Ok(false) => continue 'branch_loop,
 
                        Ok(true) => {},
 
                    }
 
                }
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                let next_branch = branch.next_branch_in_queue;
 
                self.submit_sync_solution(new_solution, ctx, results);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
                    // At the end of the previously handled solutions
 
                    break;
 
                }
 

	
 
                debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue
 
                branch_index = branch.next_branch_in_queue.unwrap();
 
                debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue
 
                branch_index = next_branch.unwrap();
 
            }
 
        }
 
    }
 

	
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        let should_propagate_message = match &self.committed_to {
 
            Some((previous_origin, previous_comparison)) => {
 
                // Already committed to something. So will commit to this if it
 
                // takes precedence over the current solution
 
                message.comparison_number > *previous_comparison ||
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0)
 
            },
 
            None => {
 
                // Not yet committed to a solution, so commit to this one
 
                true
 
            }
 
        };
 

	
 
        if should_propagate_message {
 
            self.committed_to = Some((message.connector_origin, message.comparison_number));
 

	
 
            if message.to_visit.is_empty() {
 
                // Visited all of the connectors, so every connector can now
 
                // apply the solution
 
                // TODO: Use temporary workspace
 
                let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1);
 
                for (connector_id, _) in &message.local_solutions {
 
                    if *connector_id != ctx.id {
 
                        to_visit.push(*connector_id);
 
                    }
 
                }
 

	
 
                message.to_visit = to_visit;
 
                self.handle_confirm_commit_message(message.clone(), ctx, delta_state);
 
                delta_state.outbox.push(MessageContents::ConfirmCommit(message));
 
            } else {
 
                // Not yet visited all of the connectors
 
                delta_state.outbox.push(MessageContents::RequestCommit(message));
 
            }
 
        }
 
    }
 

	
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        // Make sure this is the message we actually committed to. As long as
 
        // we're running on a single machine this is fine.
 
        // TODO: Take care of nefarious peers
 
        let (expected_connector_id, expected_comparison_number) =
 
            self.committed_to.unwrap();
 
        assert_eq!(message.connector_origin, expected_connector_id);
 
        assert_eq!(message.comparison_number, expected_comparison_number);
 

	
 
        // Find the branch we're supposed to commit to
 
        let (_, branch_id) = message.local_solutions
 
            .iter()
 
            .find(|(id, _)| *id == ctx.id)
 
            .unwrap();
 
        let branch_id = *branch_id;
 

	
 
        // Commit to the branch. That is: move the solution branch to the first
 
        // of the connector's branches
 
        self.in_sync = false;
 
        self.branches.swap(0, branch_id.index as usize);
 
        self.branches.truncate(1); // TODO: Or drain and do not deallocate?
 
        let solution = &mut self.branches[0];
 

	
 
        // Clear all of the other sync-related variables
 
        self.sync_active.clear();
 
@@ -662,264 +665,280 @@ impl ConnectorPDL {
 
        self.committed_to = None;
 
        self.inbox.clear();
 
        self.ports.commit_to_sync();
 

	
 
        // Add/remove any of the ports we lost during the sync phase
 
        for port_delta in &solution.ports_delta {
 
            if port_delta.acquired {
 
                self.ports.add_port(port_delta.port_id);
 
            } else {
 
                self.ports.remove_port(port_delta.port_id);
 
            }
 
        }
 
        solution.commit_to_sync();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Executing connector code
 
    // -------------------------------------------------------------------------
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 
                // Create two copied branches, one silent and one firing
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                let parent_branch_id = branch.index;
 
                let parent_branch = &self.branches[parent_branch_id.index as usize];
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 
                let silent_index = self.branches.len() as u32;
 
                let firing_index = silent_index + 1;
 

	
 
                let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch);
 
                self.ports.prepare_sync_branch(parent_branch.index.index, silent_index);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch);
 
                self.ports.prepare_sync_branch(parent_branch.index.index, firing_index);
 

	
 
                // Assign the port values of the two new branches
 
                let silent_port = self.ports.get_port_mut(silent_index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                let silent_branch_id = silent_branch.index;
 
                self.branches.push(silent_branch);
 
                let firing_branch_id = firing_branch.index;
 
                self.branches.push(firing_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("deal with the case where the port is acquired");
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 
                    let branch_id = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let parent_branch = &self.branches[branch_id.index as usize];
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                        self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch);
 
                        self.ports.prepare_sync_branch(branch_id.index, new_branch_index);
 

	
 
                        let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                        debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                        new_branch.received.insert(local_port_id, message.clone());
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                        let new_branch_id = new_branch.index;
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id);
 
                    }
 

	
 
                    if did_have_messages {
 
                        // If we did create any new branches, then we can run
 
                        // them immediately.
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and map them to non-firing.
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                let branch_id = branch.index;
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
                }
 
                let local_port_index = local_port_index.unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = DataMessage{
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap();
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                let first_sync_branch_id = first_sync_branch.index;
 
                self.branches.push(first_sync_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                if !results.ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 

	
 
                // Add connector for later execution
 
                let new_connector_state = ComponentState {
 
                    prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments)
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports);
 

	
 
@@ -999,48 +1018,50 @@ impl ConnectorPDL {
 
        // Walk through all elements in queue to find branch to delete
 
        let mut prev_index = 0;
 
        let mut next_index = queue.first;
 

	
 
        while next_index != 0 {
 
            if next_index == to_delete.index {
 
                // Found the element we're going to delete
 
                // - check if at the first element or not
 
                if prev_index == 0 {
 
                    queue.first = branch_next_index_unwrapped;
 
                } else {
 
                    let prev_branch = &mut branches[prev_index as usize];
 
                    prev_branch.next_branch_in_queue = branch_next_index_option;
 
                }
 

	
 
                // - check if at last element or not (also takes care of "no elements left in queue")
 
                if branch_next_index_option.is_none() {
 
                    queue.last = prev_index;
 
                }
 

	
 
                return;
 
            }
 

	
 
            prev_index = next_index;
 
            let entry = &branches[next_index as usize];
 
            next_index = entry.next_branch_in_queue.unwrap_or(0);
 
        }
 

	
 
        // If here, then we didn't find the element
 
        panic!("branch does not exist in provided queue");
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports, and for checking if specific ports can be sent
 
    // over another port.
 

	
 
    /// Releasing ownership of ports while in non-sync mode. This only occurs
 
    /// while instantiating new connectors
 
    fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(!branch.index.is_valid()); // branch in non-sync mode
 

	
 
        for port_id in port_ids {
 
            // We must own the port, or something is wrong with our code
 
            todo!("Set up some kind of message router");
 
            debug_assert!(ports.get_port_index(*port_id).is_some());
 
            ports.remove_port(*port_id);
 
        }
 

	
 
        return Ok(())
 
    }
 
@@ -1195,49 +1216,49 @@ impl ConnectorPDL {
 
            let constraint = if port_mapping.is_assigned {
 
                if port.kind == PortKind::Getter {
 
                    SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(port.peer_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent. So ask everyone to commit
 
            // TODO: Maybe another package for random?
 
            let comparison_number: u64 = unsafe {
 
                let mut random_array = [0u8; 8];
 
                getrandom::getrandom(&mut random_array);
 
                getrandom::getrandom(&mut random_array).unwrap();
 
                std::mem::transmute(random_array)
 
            };
 

	
 
            let num_local = partial_solution.local_solutions.len();
 

	
 
            let mut full_solution = SolutionMessage{
 
                comparison_number,
 
                connector_origin: ctx.id,
 
                local_solutions: Vec::with_capacity(num_local),
 
                to_visit: Vec::with_capacity(num_local - 1),
 
            };
 

	
 
            for local_solution in &partial_solution.local_solutions {
 
                full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id));
 
                if local_solution.connector_id != ctx.id {
 
                    full_solution.to_visit.push(local_solution.connector_id);
 
                }
 
            }
 

	
 
            debug_assert!(self.committed_to.is_none());
 
            self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number));
 
            results.outbox.push(MessageContents::RequestCommit(full_solution));
 
        } else {
 
            // Still have connectors to visit
src/runtime2/global_store.rs
Show inline comments
 
use std::ptr;
 
use std::sync::{Arc, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::scheduler::Router;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::{Connector, ConnectorApplication};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct ConnectorId(pub u32);
 
use super::scheduler::{Router, ConnectorCtx};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::inbox::Message;
 
use super::native::{Connector, ConnectorApplication};
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
/// Otherwise one has shared access.
 
///
 
/// This datastructure is built to be wrapped in a RwLock.
 
pub(crate) struct ConnectorStore {
 
    pub(crate) port_counter: Arc<AtomicU32>,
 
    inner: RwLock<ConnectorStoreInner>,
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        return Self{
 
            port_counter: Arc::new(AtomicU32::new(0)),
 
            inner: RwLock::new(ConnectorStoreInner {
 
                connectors: RawVec::with_capacity(capacity),
 
                free: Vec::with_capacity(capacity),
 
            }),
 
@@ -117,140 +42,138 @@ impl ConnectorStore {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &(**connector).public;
 
        }
 
    }
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
 
        // Connector interface does not own any initial ports, and cannot be
 
        // created by another connector
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true);
 
        return key;
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    /// `initial_ports` array. Furthermore the connector is initialized as not
 
    /// sleeping, so MUST be put on the connector queue by the caller.
 
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false);
 
        let new_connector = self.get_mut(&key);
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => unreachable!(),
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 

	
 
    /// Creates a connector but does not set its initial ports
 
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
 
    fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        let index;
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let mut lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                public: ConnectorPublic::new(initialize_as_sleeping),
 
                router: Router::new(),
 
            };
 

	
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
                index = lock.connectors.len();
 
                lock.connectors.push(connector);
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
                unsafe {
 
                    let target = lock.connectors.get_mut(index);
 
                    debug_assert!(!target.is_null());
 
                    ptr::write(*target, connector);
 
                }
 
            }
 
        }
 

	
 
        // Generate key and retrieve the connector to set its ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Return the connector key
 
        return key;
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        for idx in 0..lock.connectors.len() {
 
            unsafe {
 
                let memory = *lock.connectors.get_mut(idx);
 
                let _ = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
///     This includes the `should_exit` and `did_exit` pair!
 
pub struct GlobalStore {
 
pub(crate) struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
    pub(crate) fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            should_exit: AtomicBool::new(false),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox.rs
Show inline comments
 
@@ -217,126 +217,126 @@ pub struct PublicInbox {
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub struct PrivateInbox {
 
pub(crate) struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: Vec<DataMessage>,
 
    messages: Vec<(PortIdLocal, DataMessage)>,
 
    len_read: usize,
 
}
 

	
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        for (existing_target_port, existing) in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.sending_port == message.sending_port {
 
                    *existing_target_port == target_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push(message);
 
        self.messages.push((target_port, message));
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
    ///
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter{
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&mut self) -> Option<&DataMessage> {
 
    pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let to_return = &self.messages[self.len_read];
 
        let (_, to_return) = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
    pub(crate) fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    messages: &'i Vec<DataMessage>,
 
pub(crate) struct InboxMessageIter<'i> {
 
    messages: &'i Vec<(PortIdLocal, DataMessage)>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
            let (target_port, cur_message) = &self.messages[self.next_index];
 
            if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        let (_, message) = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/messages.rs
Show inline comments
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 

	
 
/// An action performed on a port. Unsure about this
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod native;
 
mod port;
 
mod global_store;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 

	
 
// Imports
 

	
 
use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::ProtocolDescription;
 

	
 
use global_store::{ConnectorVariant, GlobalStore};
 
use scheduler::Scheduler;
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
// Runtime API
 
// TODO: Exit condition is very dirty. Take into account:
 
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
 
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
 
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub(crate) enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
pub(crate) struct RuntimeInner {
 
    pub(crate) global_store: GlobalStore,
 
    // Protocol
 
    pub(crate) protocol_description: ProtocolDescription,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
    // Storage of connectors in a kind of freelist. Note the vector of points to
 
    // ensure pointer stability: the vector can be changed but the entries
 
    // themselves remain valid.
 
    pub connectors_list: RawVec<*mut ScheduledConnector>,
 
    pub connectors_free: Vec<usize>,
 

	
 
    pub(crate) global_store: GlobalStore,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>,
 
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
 
}
 

	
 
impl RuntimeInner {
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 {
 
            // Became 0
 
            // TODO: Check num connectors, if 0, then set exit flag
 
        }
 
    }
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            global_store: GlobalStore::new(),
 
            protocol_description,
 
            schedulers: Mutex::new(Vec::new()),
 
            active_interfaces: AtomicU32::new(1), // we are the active interface
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
            let mut schedulers = Vec::with_capacity(num_threads as usize);
 
            for thread_index in 0..num_threads {
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::spawn(move || {
 
                    let mut scheduler = Scheduler::new(cloned_runtime_inner);
 
                    scheduler.run();
 
                });
 
                let thread = thread::Builder::new()
 
                    .name(format!("thread-{}", thread_index))
 
                    .spawn(move || {
 
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
 
                        scheduler.run();
 
                    })
 
                    .unwrap();
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.global_store.should_exit.store(true, Ordering::Release);
 
        let mut schedulers = self.inner.schedulers.lock().unwrap();
 
        for scheduler in schedulers.drain(..) {
 
            scheduler.join();
 
            scheduler.join().unwrap();
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
use super::RuntimeInner;
 
use super::global_store::ConnectorId;
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::Message;
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
pub(crate) trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        runtime.active_interfaces += 1;
 

	
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        // Create ports and add a job such that they are transferred to the
 
        // API component. (note that we do not send a ping, this is only
 
        // necessary once we create a connector)
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        };
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push(ApplicationJob::NewChannel((getter_port, putter_port)));
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push(ApplicationJob::NewConnector(connector));
 
            queue.push_back(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        // Send ping message to wake up connector
 
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            println!("DEBUG: Waking up connector");
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        } else {
 
            println!("DEBUG: NOT waking up connector");
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v); // wait while not done
 
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 

	
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub(crate) struct PortIdLocal {
 
pub struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
src/runtime2/runtime.rs
Show inline comments
 
@@ -1041,49 +1041,49 @@ impl Runtime {
 
            result.push(next_branch_index);
 
            let branch = &desc.branches[next_branch_index as usize];
 

	
 
            match branch.parent_index {
 
                Some(index) => next_branch_index = index,
 
                None => return,
 
            }
 
        }
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Temporary references to branch related storage
 
    inbox: &'a HashMap<(PortId, u32), ValueGroup>,
 
    port_mapping: &'a HashMap<PortId, BranchPortDesc>,
 
    branch_ctx: &'a mut BranchContext,
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
    fn did_put(&mut self, _port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        let old_value = self.branch_ctx.just_called_did_put;
 
        self.branch_ctx.just_called_did_put = false;
 
        return old_value;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let inbox_key = (port, 1);
 
        match self.inbox.get(&inbox_key) {
 
            None => None,
 
            Some(value) => Some(value.clone()),
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self.port_mapping.get(&port) {
 
            None => None,
 
            Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)),
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.branch_ctx.pending_channel.take()
src/runtime2/scheduler.rs
Show inline comments
 
@@ -67,129 +67,135 @@ impl ConnectorCtx {
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
            let mut connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            while connector_key.is_none() {
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                println!("DEBUG [{}]: Nothing to do", scheduler_id);
 
                thread::sleep(Duration::new(1, 0));
 
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    println!("DEBUG [{}]: ... So I am quitting", scheduler_id);
 
                    break 'thread_loop;
 
                }
 

	
 
                println!("DEBUG [{}]: ... But I'm still running", scheduler_id);
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index);
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state);
 
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                let new_schedule = scheduled.connector.run(
 
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
 
                );
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.global_store.connector_queue.push_back(connector_key);
 
                },
 
@@ -328,49 +334,50 @@ struct ReroutedTraffic {
 

	
 
pub(crate) struct Router {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
}
 

	
 
impl Router {
 
    pub fn new() -> Self {
 
        Router{
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 
        let (new_id_counter, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id_counter;
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            target_port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.target_port == target_port {
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::runtime::*;
 
use crate::ProtocolDescription;
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
#[test]
 
fn test_single_message() {
 
    // Simple test were we have a `putter` component, which will simply send a
 
    // single message (a boolean), and a `getter` component, which will receive
 
    // that message.
 
    // We will write this behaviour in the various ways that the language
 
    // currently allows. We will cheat a bit by peeking into the runtime to make
 
    // sure that the getter actually received the message.
 
    // TODO: Expose ports to a "native application"
 

	
 
    fn check_store_bool(value: &Value, expected: bool) {
 
        if let Value::Bool(value) = value {
 
            assert_eq!(*value, expected);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 

	
 
    fn run_putter_getter(code: &[u8]) {
 
        // Compile code
 
        let pd = ProtocolDescription::parse(code)
 
            .expect("code successfully compiles");
 
        let pd = Arc::new(pd);
 

	
 
        // Construct runtime and the appropriate ports and connectors
 
        let mut rt = Runtime::new(pd);
 
        let (put_port, get_port) = rt.add_channel();
 

	
 
        let mut put_args = ValueGroup::new_stack(vec![
 
            put_port,
 
        ]);
 
        rt.add_component("", "putter", put_args)
 
            .expect("'putter' component created");
 

	
 
        let mut get_args = ValueGroup::new_stack(vec![
 
            get_port,
 
        ]);
 
        rt.add_component("", "getter", get_args)
 
            .expect("'getter' component created");
 

	
 
        // Run until completion
 
        rt.run();
 
fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(num_threads, protocol);
 

	
 
        // Check for success (the 'received' and 'did_receive" flags)
 
        let getter_component = rt.connectors.get(&1).unwrap();
 
        let branch = &getter_component.branches[0];
 
        assert_eq!(branch.branch_state, BranchState::Finished);
 
    return runtime;
 
}
 

	
 
        // Note: with the stack structure of the store, the first entry is the
 
        // "previous stack pos" and the second one is the input port passed to
 
        // the procedure. Hence the third/fourth entries are the boolean
 
        // variables on the stack.
 
        check_store_bool(&branch.code_state.prompt.store.stack[2], true);
 
        check_store_bool(&branch.code_state.prompt.store.stack[3], true);
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(4, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"putting!\");
 
            put(sender, true);
 
        }
 
        index += 1;
 
    }
 

	
 
    // Without `fires()`, just a single valid behaviour
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                put(put_here, true);
 
            }
 
}
 

	
 
primitive getter(in<bool> receiver, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"getting!\");
 
            auto result = get(receiver);
 
            assert(result);
 
        }
 
        index += 1;
 
    }
 
}
 
    ");
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false;
 
            bool did_receive = false;
 

	
 
            synchronous {
 
                received = get(get_here);
 
                if (received) {
 
                    print(\"value was 'true'\");
 
                } else {
 
                    print(\"value was 'false'\");
 
                }
 
                did_receive = true;
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the putter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (!fires(put_here)) {
 
                    assert(false);
 
                } else {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 5;
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                }
 
            }
 
        }");
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 

	
 
    // With `fires()`, but eliminating on the getter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (fires(put_here)) {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create getter");
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                } else {
 
                    assert(false);
 
                }
 
            }
 
        }"
 
    );
 
    println!("Am I running?");
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)