Changeset - 7d01f1245b7c
[Not reviewed]
0 12 0
mh - 4 years ago 2021-10-20 20:26:23
contact@maxhenger.nl
Everything compiles again, pending restructuring of shared runtime objects
12 files changed with 302 insertions and 305 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_vec.rs
Show inline comments
 
@@ -34,7 +34,7 @@ impl<T: Sized> RawVec<T> {
 
    pub fn with_capacity(capacity: usize) -> Self {
 
        // Could be done a bit more efficiently
 
        let mut result = Self::new();
 
        result.ensure_space(capacity);
 
        result.ensure_space(capacity).unwrap();
 
        return result;
 
    }
 

	
 
@@ -49,7 +49,7 @@ impl<T: Sized> RawVec<T> {
 
    }
 

	
 
    pub fn push(&mut self, item: T) {
 
        self.ensure_space(1);
 
        self.ensure_space(1).unwrap();
 
        unsafe {
 
            let target = self.base.add(self.len);
 
            std::ptr::write(target, item);
 
@@ -87,7 +87,7 @@ impl<T: Sized> RawVec<T> {
 
                    dealloc(old_base, old_layout);
 
                }
 

	
 
                self.base = new_base;
 
                self.base = new_base as *mut T;
 
                self.cap = new_cap;
 
            }
 
        } // else: still enough space
 
@@ -114,7 +114,7 @@ impl<T: Sized> Drop for RawVec<T> {
 
            debug_assert!(!self.base.is_null());
 
            let (_, layout) = self.current_layout();
 
            unsafe {
 
                dealloc(self.base, layout);
 
                dealloc(self.base as *mut u8, layout);
 
                if cfg!(debug_assertions) {
 
                    self.base = ptr::null_mut();
 
                }
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -1119,7 +1119,6 @@ impl TypeTable {
 
            let mut breadcrumb = self.type_loop_breadcrumbs[breadcrumb_idx].clone();
 

	
 
            let poly_type = self.lookup.get(&breadcrumb.definition_id).unwrap();
 
            let poly_type_definition_id = poly_type.ast_definition;
 

	
 
            let resolve_result = match &poly_type.definition {
 
                DTV::Enum(_) => {
src/runtime2/connector.rs
Show inline comments
 
@@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{MessageContents, SolutionMessage};
 
use crate::runtime2::inbox::{Message, MessageContents, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 
@@ -19,7 +19,7 @@ use super::port::PortIdLocal;
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
pub struct BranchId {
 
    pub index: u32,
 
}
 

	
 
@@ -145,6 +145,7 @@ struct PortOwnershipDelta {
 
    port_id: PortIdLocal,
 
}
 

	
 
#[derive(Debug)]
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
@@ -158,7 +159,7 @@ pub(crate) struct ConnectorPorts {
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    pub port_mapping: Vec<PortAssignment>
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
@@ -188,7 +189,8 @@ impl ConnectorPorts {
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
            let parent_port = parent_port.clone();
 
            self.port_mapping.push(parent_port);
 
        }
 
    }
 

	
 
@@ -303,10 +305,10 @@ pub(crate) struct ConnectorPublic {
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(false),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 
@@ -349,14 +351,14 @@ impl RunContext for TempCtx {
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message {
 
            MC::Data(message) => self.handle_data_message(message),
 
            MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state),
 
            MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state),
 
            MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state),
 
        match message.contents {
 
            MC::Data(content) => self.handle_data_message(message.receiving_port, content),
 
            MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state),
 
            MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state),
 
            MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 
@@ -446,8 +448,8 @@ impl ConnectorPDL {
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        self.inbox.insert_message(target_port, message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
@@ -577,6 +579,7 @@ impl ConnectorPDL {
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                let next_branch = branch.next_branch_in_queue;
 
                self.submit_sync_solution(new_solution, ctx, results);
 

	
 
                // Consider the next branch
 
@@ -585,8 +588,8 @@ impl ConnectorPDL {
 
                    break;
 
                }
 

	
 
                debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue
 
                branch_index = branch.next_branch_in_queue.unwrap();
 
                debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue
 
                branch_index = next_branch.unwrap();
 
            }
 
        }
 
    }
 
@@ -629,7 +632,7 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        // Make sure this is the message we actually committed to. As long as
 
        // we're running on a single machine this is fine.
 
        // TODO: Take care of nefarious peers
 
@@ -683,7 +686,7 @@ impl ConnectorPDL {
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
@@ -709,24 +712,35 @@ impl ConnectorPDL {
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 
                // Create two copied branches, one silent and one firing
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                let parent_branch_id = branch.index;
 
                let parent_branch = &self.branches[parent_branch_id.index as usize];
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 
                let silent_index = self.branches.len() as u32;
 
                let firing_index = silent_index + 1;
 

	
 
                let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch);
 
                self.ports.prepare_sync_branch(parent_branch.index.index, silent_index);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch);
 
                self.ports.prepare_sync_branch(parent_branch.index.index, firing_index);
 

	
 
                // Assign the port values of the two new branches
 
                let silent_port = self.ports.get_port_mut(silent_index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                let silent_branch_id = silent_branch.index;
 
                self.branches.push(silent_branch);
 
                let firing_branch_id = firing_branch.index;
 
                self.branches.push(firing_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
@@ -755,7 +769,8 @@ impl ConnectorPDL {
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 
                    let branch_id = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
@@ -766,9 +781,10 @@ impl ConnectorPDL {
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let parent_branch = &self.branches[branch_id.index as usize];
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                        self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch);
 
                        self.ports.prepare_sync_branch(branch_id.index, new_branch_index);
 

	
 
                        let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
@@ -785,8 +801,9 @@ impl ConnectorPDL {
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                        let new_branch_id = new_branch.index;
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id);
 
                    }
 

	
 
                    if did_have_messages {
 
@@ -808,8 +825,9 @@ impl ConnectorPDL {
 
                    }
 
                }
 

	
 
                let branch_id = branch.index;
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
@@ -853,7 +871,7 @@ impl ConnectorPDL {
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap();
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 
@@ -875,7 +893,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -897,8 +915,9 @@ impl ConnectorPDL {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                let first_sync_branch_id = first_sync_branch.index;
 
                self.branches.push(first_sync_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
@@ -1020,6 +1039,8 @@ impl ConnectorPDL {
 
            }
 

	
 
            prev_index = next_index;
 
            let entry = &branches[next_index as usize];
 
            next_index = entry.next_branch_in_queue.unwrap_or(0);
 
        }
 

	
 
        // If here, then we didn't find the element
 
@@ -1216,7 +1237,7 @@ impl ConnectorPDL {
 
            // TODO: Maybe another package for random?
 
            let comparison_number: u64 = unsafe {
 
                let mut random_array = [0u8; 8];
 
                getrandom::getrandom(&mut random_array);
 
                getrandom::getrandom(&mut random_array).unwrap();
 
                std::mem::transmute(random_array)
 
            };
 

	
src/runtime2/global_store.rs
Show inline comments
 
@@ -3,88 +3,13 @@ use std::sync::{Arc, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::scheduler::Router;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::{Connector, ConnectorApplication};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct ConnectorId(pub u32);
 
use super::scheduler::{Router, ConnectorCtx};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::inbox::Message;
 
use super::native::{Connector, ConnectorApplication};
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
/// owns a `ConnectorKey`, then one has unique access to that connector.
 
@@ -138,16 +63,17 @@ impl ConnectorStore {
 
    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
 
        // Connector interface does not own any initial ports, and cannot be
 
        // created by another connector
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true);
 
        return key;
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    /// `initial_ports` array. Furthermore the connector is initialized as not
 
    /// sleeping, so MUST be put on the connector queue by the caller.
 
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false);
 
        let new_connector = self.get_mut(&key);
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
@@ -166,7 +92,7 @@ impl ConnectorStore {
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
@@ -178,26 +104,23 @@ impl ConnectorStore {
 
    }
 

	
 
    /// Creates a connector but does not set its initial ports
 
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
 
    fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        let index;
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let mut lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                public: ConnectorPublic::new(initialize_as_sleeping),
 
                router: Router::new(),
 
            };
 

	
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
                index = lock.connectors.len();
 
                lock.connectors.push(connector);
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
@@ -239,14 +162,14 @@ impl Drop for ConnectorStore {
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
///     This includes the `should_exit` and `did_exit` pair!
 
pub struct GlobalStore {
 
pub(crate) struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
    pub(crate) fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
src/runtime2/inbox.rs
Show inline comments
 
@@ -238,11 +238,11 @@ impl PublicInbox {
 
    }
 
}
 

	
 
pub struct PrivateInbox {
 
pub(crate) struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: Vec<DataMessage>,
 
    messages: Vec<(PortIdLocal, DataMessage)>,
 
    len_read: usize,
 
}
 

	
 
@@ -257,17 +257,17 @@ impl PrivateInbox {
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        for (existing_target_port, existing) in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.sending_port == message.sending_port {
 
                    *existing_target_port == target_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push(message);
 
        self.messages.push((target_port, message));
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
@@ -278,7 +278,7 @@ impl PrivateInbox {
 
    /// This function should only be used to check if already-received messages
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter{
 
            messages: &self.messages,
 
            next_index: 0,
 
@@ -290,26 +290,26 @@ impl PrivateInbox {
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&mut self) -> Option<&DataMessage> {
 
    pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let to_return = &self.messages[self.len_read];
 
        let (_, to_return) = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
    pub(crate) fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    messages: &'i Vec<DataMessage>,
 
pub(crate) struct InboxMessageIter<'i> {
 
    messages: &'i Vec<(PortIdLocal, DataMessage)>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
@@ -322,8 +322,8 @@ impl<'i> Iterator for InboxMessageIter<'i> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
            let (target_port, cur_message) = &self.messages[self.next_index];
 
            if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 
@@ -335,7 +335,7 @@ impl<'i> Iterator for InboxMessageIter<'i> {
 
            return None;
 
        }
 

	
 
        let message = &self.messages[self.next_index];
 
        let (_, message) = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
src/runtime2/messages.rs
Show inline comments
 
@@ -2,7 +2,6 @@ use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
src/runtime2/mod.rs
Show inline comments
 
@@ -14,7 +14,7 @@ mod inbox;
 
// Imports
 

	
 
use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::ProtocolDescription;
 
@@ -24,19 +24,113 @@ use scheduler::Scheduler;
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
// Runtime API
 
// TODO: Exit condition is very dirty. Take into account:
 
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
 
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
 
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
 
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
 
//  to truly design this properly I need some benchmarks.
 
pub(crate) enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
impl Connector for ConnectorVariant {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
}
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
pub(crate) struct RuntimeInner {
 
    pub(crate) global_store: GlobalStore,
 
    // Protocol
 
    pub(crate) protocol_description: ProtocolDescription,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
    // Storage of connectors in a kind of freelist. Note the vector of points to
 
    // ensure pointer stability: the vector can be changed but the entries
 
    // themselves remain valid.
 
    pub connectors_list: RawVec<*mut ScheduledConnector>,
 
    pub connectors_free: Vec<usize>,
 

	
 
    pub(crate) global_store: GlobalStore,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>,
 
    active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels
 
}
 

	
 
impl RuntimeInner {
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 {
 
            // Became 0
 
            // TODO: Check num connectors, if 0, then set exit flag
 
        }
 
    }
 
}
 

	
 
// TODO: Come back to this at some point
 
@@ -44,24 +138,28 @@ unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            global_store: GlobalStore::new(),
 
            protocol_description,
 
            schedulers: Mutex::new(Vec::new()),
 
            active_interfaces: AtomicU32::new(1), // we are the active interface
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
            let mut schedulers = Vec::with_capacity(num_threads as usize);
 
            for thread_index in 0..num_threads {
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::spawn(move || {
 
                    let mut scheduler = Scheduler::new(cloned_runtime_inner);
 
                    scheduler.run();
 
                });
 
                let thread = thread::Builder::new()
 
                    .name(format!("thread-{}", thread_index))
 
                    .spawn(move || {
 
                        let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index);
 
                        scheduler.run();
 
                    })
 
                    .unwrap();
 

	
 
                schedulers.push(thread);
 
            }
 
@@ -92,7 +190,7 @@ impl Drop for Runtime {
 
        self.inner.global_store.should_exit.store(true, Ordering::Release);
 
        let mut schedulers = self.inner.schedulers.lock().unwrap();
 
        for scheduler in schedulers.drain(..) {
 
            scheduler.join();
 
            scheduler.join().unwrap();
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
@@ -17,18 +18,18 @@ use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::Message;
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
pub(crate) trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
@@ -45,7 +46,7 @@ pub struct ConnectorApplication {
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 
@@ -55,20 +56,31 @@ impl ConnectorApplication {
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
@@ -89,7 +101,9 @@ pub struct ApplicationInterface {
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        runtime.active_interfaces += 1;
 

	
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
@@ -122,7 +136,7 @@ impl ApplicationInterface {
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push(ApplicationJob::NewChannel((getter_port, putter_port)));
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
@@ -162,7 +176,7 @@ impl ApplicationInterface {
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push(ApplicationJob::NewConnector(connector));
 
            queue.push_back(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        // Send ping message to wake up connector
 
@@ -178,8 +192,11 @@ impl ApplicationInterface {
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            println!("DEBUG: Waking up connector");
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        } else {
 
            println!("DEBUG: NOT waking up connector");
 
        }
 

	
 
        return Ok(());
 
@@ -196,11 +213,17 @@ impl ApplicationInterface {
 
    pub fn wait(&self) {
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v); // wait while not done
 
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 

	
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub(crate) struct PortIdLocal {
 
pub struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
src/runtime2/runtime.rs
Show inline comments
 
@@ -1062,7 +1062,7 @@ struct Context<'a> {
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
    fn did_put(&mut self, _port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        let old_value = self.branch_ctx.just_called_did_put;
src/runtime2/scheduler.rs
Show inline comments
 
@@ -88,6 +88,7 @@ impl ConnectorCtx {
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
@@ -95,32 +96,37 @@ pub(crate) struct Scheduler {
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
            let mut connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            while connector_key.is_none() {
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                println!("DEBUG [{}]: Nothing to do", scheduler_id);
 
                thread::sleep(Duration::new(1, 0));
 
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    println!("DEBUG [{}]: ... So I am quitting", scheduler_id);
 
                    break 'thread_loop;
 
                }
 

	
 
                println!("DEBUG [{}]: ... But I'm still running", scheduler_id);
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index);
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
@@ -168,7 +174,7 @@ impl Scheduler {
 
                        }
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state);
 
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
@@ -349,7 +355,8 @@ impl Router {
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 
        let (new_id_counter, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id_counter;
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::runtime::*;
 
use crate::ProtocolDescription;
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
#[test]
 
fn test_single_message() {
 
    // Simple test were we have a `putter` component, which will simply send a
 
    // single message (a boolean), and a `getter` component, which will receive
 
    // that message.
 
    // We will write this behaviour in the various ways that the language
 
    // currently allows. We will cheat a bit by peeking into the runtime to make
 
    // sure that the getter actually received the message.
 
    // TODO: Expose ports to a "native application"
 

	
 
    fn check_store_bool(value: &Value, expected: bool) {
 
        if let Value::Bool(value) = value {
 
            assert_eq!(*value, expected);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 

	
 
    fn run_putter_getter(code: &[u8]) {
 
        // Compile code
 
        let pd = ProtocolDescription::parse(code)
 
            .expect("code successfully compiles");
 
        let pd = Arc::new(pd);
 

	
 
        // Construct runtime and the appropriate ports and connectors
 
        let mut rt = Runtime::new(pd);
 
        let (put_port, get_port) = rt.add_channel();
 

	
 
        let mut put_args = ValueGroup::new_stack(vec![
 
            put_port,
 
        ]);
 
        rt.add_component("", "putter", put_args)
 
            .expect("'putter' component created");
 

	
 
        let mut get_args = ValueGroup::new_stack(vec![
 
            get_port,
 
        ]);
 
        rt.add_component("", "getter", get_args)
 
            .expect("'getter' component created");
 

	
 
        // Run until completion
 
        rt.run();
 
fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(num_threads, protocol);
 

	
 
        // Check for success (the 'received' and 'did_receive" flags)
 
        let getter_component = rt.connectors.get(&1).unwrap();
 
        let branch = &getter_component.branches[0];
 
        assert_eq!(branch.branch_state, BranchState::Finished);
 
    return runtime;
 
}
 

	
 
        // Note: with the stack structure of the store, the first entry is the
 
        // "previous stack pos" and the second one is the input port passed to
 
        // the procedure. Hence the third/fourth entries are the boolean
 
        // variables on the stack.
 
        check_store_bool(&branch.code_state.prompt.store.stack[2], true);
 
        check_store_bool(&branch.code_state.prompt.store.stack[3], true);
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(4, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"putting!\");
 
            put(sender, true);
 
        }
 
        index += 1;
 
    }
 

	
 
    // Without `fires()`, just a single valid behaviour
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                put(put_here, true);
 
            }
 
}
 

	
 
primitive getter(in<bool> receiver, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"getting!\");
 
            auto result = get(receiver);
 
            assert(result);
 
        }
 
        index += 1;
 
    }
 
}
 
    ");
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false;
 
            bool did_receive = false;
 

	
 
            synchronous {
 
                received = get(get_here);
 
                if (received) {
 
                    print(\"value was 'true'\");
 
                } else {
 
                    print(\"value was 'false'\");
 
                }
 
                did_receive = true;
 
            }
 
        }");
 

	
 
    // With `fires()`, but eliminating on the putter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (!fires(put_here)) {
 
                    assert(false);
 
                } else {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 5;
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                }
 
            }
 
        }");
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 

	
 
    // With `fires()`, but eliminating on the getter side
 
    run_putter_getter(
 
        b"primitive putter(out<bool> put_here) {
 
            synchronous {
 
                if (fires(put_here)) {
 
                    put(put_here, true);
 
                }
 
            }
 
        }
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create getter");
 

	
 
        primitive getter(in<bool> get_here) {
 
            bool received = false; bool did_receive = false;
 
            synchronous {
 
                if (fires(get_here)) {
 
                    received = get(get_here);
 
                    did_receive = true;
 
                } else {
 
                    assert(false);
 
                }
 
            }
 
        }"
 
    );
 
    println!("Am I running?");
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)