Changeset - 4da5e57a9834
[Not reviewed]
0 7 0
MH - 4 years ago 2021-12-02 17:01:59
contact@maxhenger.nl
Add error checking to sending component messages
7 files changed with 196 insertions and 201 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -5,59 +5,57 @@
 
// conceptually handled by particular data structures. That is to say: the code
 
// that we run governs: running PDL code, keeping track of ports, instantiating
 
// new components and transports (i.e. interacting with the runtime), running
 
// a consensus algorithm, etc. But on the other hand, our data is rather
 
// simple: we have a speculative execution tree, a set of ports that we own,
 
// and a bit of code that we should run.
 
//
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::protocol::RunContext;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement};
 
use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncControlMessage, PublicInbox};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
enum Mode {
 
    NonSync,    // running non-sync code
 
    Sync,       // running sync code (in potentially multiple branches)
 
    SyncError,  // encountered an unrecoverable error in sync mode
 
    Error,      // encountered an error in non-sync mode (or finished handling the sync mode error).
src/runtime2/consensus.rs
Show inline comments
 
@@ -1178,49 +1178,49 @@ impl SolutionCombiner {
 
            }
 

	
 
            let cur_component = &mut self.local[component_index];
 
            let cur_solution = &mut cur_component.solutions[solution_index];
 

	
 
            match cur_solution.matches.iter_mut()
 
                .find(|v| v.target_id == new_component_match.target_id)
 
            {
 
                Some(other_match) => {
 
                    // Already have an entry
 
                    debug_assert_eq!(other_match.target_index, new_component_match.target_index);
 
                    other_match.match_indices.extend(&new_component_match.match_indices);
 
                },
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_for_global_solution(component_index, solution_index);
 
    }
 

	
 
    fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool {
 
        'new_report_loop: for entry in channels {
 
        for entry in channels {
 
            let mut found = false;
 

	
 
            for existing in &mut self.presence {
 
                if existing.id == entry.channel_id {
 
                    // Same entry. We only update if we have the second
 
                    // component coming in it owns one end of the channel, or if
 
                    // a component is telling us that the channel is (now)
 
                    // closed.
 
                    if entry.is_closed {
 
                        existing.state = PresenceState::Closed;
 
                    } else if component_id != existing.owner_a && existing.state != PresenceState::Closed {
 
                        existing.state = PresenceState::BothPresent;
 
                    }
 

	
 
                    if existing.owner_a != component_id {
 
                        existing.owner_b = Some(component_id);
 
                    }
 

	
 
                    found = true;
 
                    break;
 
                }
 
            }
 

	
 
            if !found {
src/runtime2/inbox.rs
Show inline comments
 
use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner};
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::consensus::{GlobalSolution, LocalSolution};
 
use super::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct ChannelAnnotation {
 
    pub channel_id: ChannelId,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
 
/// unique identifier for a branch, but differs in that a branch only has one
 
/// branch ID, but might have multiple associated markers (i.e. one branch
 
/// performing a `put` three times will generate three markers.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchMarker{
 
    marker: u32,
 
}
 

	
 
impl BranchMarker {
 
    #[inline]
 
    pub(crate) fn new(marker: u32) -> Self {
 
@@ -211,25 +210,30 @@ pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub(crate) fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub(crate) fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 

	
 
    pub fn clear(&self) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.clear();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
@@ -8,48 +8,49 @@ mod consensus;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 
mod connector;
 

	
 
// Imports
 

	
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Condvar, Mutex, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use inbox::Message;
 
use port::{ChannelId, Port, PortState};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
#[derive(Debug)]
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
    pub generation: u32,
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId{
 
            index: self.index,
 
            generation: self.generation,
 
        };
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{
 
            index: id.index,
 
            generation: id.generation,
 
        };
 
@@ -252,106 +253,229 @@ impl RuntimeInner {
 
        let channel_id = ChannelId::new(getter_id);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            channel_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            channel_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    /// Sends a message to a particular connector. If the connector happened to
 
    /// be sleeping then it will be scheduled for execution.
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) {
 
        let target = self.get_component_public(target_id);
 
    /// Sends a message directly (without going through the port) to a
 
    /// component. This is slightly less efficient then sending over a port, but
 
    /// might be preferable for some algorithms. If the component was sleeping
 
    /// then it is scheduled for execution.
 
    pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool {
 
        let target = {
 
            let mut lock = self.connectors.read().unwrap();
 
            lock.get(target_id.index)
 
        };
 

	
 
        // Do a CAS on the number of users. Most common case the component is
 
        // alive and we're the only one sending the message. Note that if we
 
        // finish this block, we're sure that no-one has set the `num_users`
 
        // value to 0. This is essential! When at 0, the component is added to
 
        // the freelist and the generation counter will be incremented.
 
        let mut cur_num_users = 1;
 
        while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) {
 
            if old_num_users == 0 {
 
                // Cannot send message. Whatever the component state is
 
                // (destroyed, at a different generation number, busy being
 
                // destroyed, etc.) we cannot send the message and will not
 
                // modify the component
 
                return false;
 
            }
 

	
 
            cur_num_users = old_num_users;
 
        }
 

	
 
        // We incremented the counter. But we might still be at the wrong
 
        // generation number. The generation number is a monotonically
 
        // increasing value. Since it only increases when someone gets the
 
        // `num_users` counter to 0, we can simply load the generation number.
 
        let generation = target.generation.load(Ordering::Acquire);
 
        if generation != target_id.generation {
 
            // We're at the wrong generation, so we cannot send the message.
 
            // However, since we incremented the `num_users` counter, the moment
 
            // we decrement it we might be the one that are supposed to handle
 
            // the destruction of the component. Note that all users of the
 
            // component do an increment-followed-by-decrement, we can simply
 
            // do a `fetch_sub`.
 
            let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst);
 
            if old_num_users == 1 {
 
                // We're the one that got the counter to 0, so we're the ones
 
                // that are supposed to handle component exit
 
                self.finish_component_destruction(target_id);
 
            }
 

	
 
            return false;
 
        }
 

	
 
        // The generation is correct, and since we incremented the `num_users`
 
        // counter we're now sure that we can send the message and it will be
 
        // handled by the receiver
 
        target.connector.public.inbox.insert_message(message);
 

	
 
        // Finally, do the same as above: decrement number of users, if at gets
 
        // to 0 we're the ones who should handle the exit condition.
 
        let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst);
 
        if old_num_users == 1 {
 
            // We're allowed to destroy the component.
 
            self.finish_component_destruction(target_id);
 
        } else {
 
            // Message is sent. If the component is sleeping, then we're sure
 
            // it is not scheduled and it has not initiated the destruction of
 
            // the component (because of the way
 
            // `initiate_component_destruction` does not set sleeping to true).
 
            // So we can safely schedule it.
 
            let should_wake_up = target.connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up {
 
                let key = unsafe{ ConnectorKey::from_id(target_id) };
 
                self.push_work(key);
 
            }
 
        }
 

	
 
        return true
 
    }
 

	
 
    /// Sends a message to a particular component, assumed to occur over a port.
 
    /// If the component happened to be sleeping then it will be scheduled for
 
    /// execution. Because of the port management system we may assumed that
 
    /// we're always accessing the component at the right generation number.
 
    pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) {
 
        let target = {
 
            let lock = self.connectors.read().unwrap();
 
            let entry = lock.get(target_id.index);
 
            debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation);
 
            &mut entry.connector.public
 
        };
 

	
 
        target.inbox.insert_message(message);
 

	
 
        let should_wake_up = target.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(target_id) };
 
            self.push_work(key);
 
        }
 
    }
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    /// Creates an initially sleeping application connector.
 
    fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
        // Initialize as sleeping, as it will be scheduled by the programmer.
 
        let mut lock = self.connectors.write().unwrap();
 
        let key = lock.create(ConnectorVariant::Native(Box::new(component)), true);
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 

	
 
    /// Creates a new PDL component. This function just creates the component.
 
    /// If you create it initially awake, then you must add it to the work
 
    /// queue. Other aspects of correctness (i.e. setting initial ports) are
 
    /// relinquished to the caller!
 
    pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey {
 
        // Create as not sleeping, as we'll schedule it immediately
 
        let key = {
 
            let mut lock = self.connectors.write().unwrap();
 
            lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping)
 
        };
 

	
 
        self.increment_active_components();
 
        return key;
 
    }
 

	
 
    /// Retrieve private access to the component through its key.
 
    #[inline]
 
    pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.connectors.read().unwrap();
 
        return lock.get_private(connector_key);
 
        let entry = {
 
            let lock = self.connectors.read().unwrap();
 
            lock.get(connector_key.index)
 
        };
 

	
 
        debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key);
 
        return &mut entry.connector;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.connectors.read().unwrap();
 
        return lock.get_public(connector_id);
 
    // --- Managing component destruction
 

	
 
    /// Start component destruction, may only be done by the scheduler that is
 
    /// executing the component. This might not actually destroy the component,
 
    /// since other components might be sending it messages.
 
    fn initiate_component_destruction(&self, connector_key: ConnectorKey) {
 
        // Most of the time no-one will be sending messages, so try
 
        // immediate destruction
 
        let mut lock = self.connectors.write().unwrap();
 
        let entry = lock.get(connector_key.index);
 
        debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation);
 
        debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component
 
        let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst);
 
        if old_num_users == 1 {
 
            // We just brought the number of users down to 0. Destroy the
 
            // component
 
            entry.connector.public.inbox.clear();
 
            entry.generation.fetch_add(1, Ordering::SeqCst);
 
            lock.destroy(connector_key);
 
            self.decrement_active_components();
 
        }
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, connector_key: ConnectorKey) {
 
    fn finish_component_destruction(&self, connector_id: ConnectorId) {
 
        let mut lock = self.connectors.write().unwrap();
 
        lock.destroy(connector_key);
 
        let entry = lock.get(connector_id.index);
 
        debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0);
 
        let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_eq!(_old_generation, connector_id.generation);
 

	
 
        // TODO: In the future we should not only clear out the inbox, but send
 
        //  messages back to the senders indicating the messages did not arrive.
 
        entry.connector.public.inbox.clear();
 

	
 
        // Invariant of only one thread being able to handle the internals of
 
        // component is preserved by the fact that only one thread can decrement
 
        // `num_users` to 0.
 
        lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) });
 
        self.decrement_active_components();
 
    }
 

	
 
    // --- Managing exit condition
 

	
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that active interfaces is now 0
 
            let num_connectors = self.active_connectors.load(Ordering::Acquire);
 
            if num_connectors == 0 {
 
                self.signal_for_shutdown();
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
@@ -375,123 +499,114 @@ impl RuntimeInner {
 
        debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0);
 

	
 
        let _lock = self.connector_queue.lock().unwrap();
 
        let should_signal = self.should_exit
 
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_signal {
 
            self.scheduler_notifier.notify_all();
 
        }
 
    }
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorStore
 
// -----------------------------------------------------------------------------
 

	
 
struct StoreEntry {
 
    connector: ScheduledConnector,
 
    generation: std::sync::atomic::AtomicU32,
 
    num_users: std::sync::atomic::AtomicU32,
 
}
 

	
 
struct ConnectorStore {
 
    // Freelist storage of connectors. Storage should be pointer-stable as
 
    // someone might be mutating the vector while we're executing one of the
 
    // connectors.
 
    entries: RawVec<*mut StoreEntry>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self {
 
            entries: RawVec::with_capacity(capacity),
 
            free: Vec::with_capacity(capacity),
 
        }
 
    }
 

	
 
    /// Retrieves public part of connector - accessible by many threads at once.
 
    fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic {
 
        unsafe {
 
            let entry = self.entries.get(id.index as usize);
 
            debug_assert!(!entry.is_null());
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            assert_eq!(cur_generation, id.generation, "accessing {}", id.index);
 
            return &(**entry).connector.public;
 
        }
 
    }
 

	
 
    /// Retrieves private part of connector - accessible by one thread at a
 
    /// time.
 
    fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
    /// Directly retrieves an entry. There be dragons here. The `connector`
 
    /// might have its destructor already executed. Accessing it might then lead
 
    /// to memory corruption.
 
    fn get(&self, index: u32) -> &'static mut StoreEntry {
 
        unsafe {
 
            let entry = self.entries.get_mut(key.index as usize);
 
            debug_assert!(!entry.is_null());
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            assert_eq!(cur_generation, key.generation, "accessing {}", key.index);
 
            return &mut (**entry).connector;
 
            let entry = self.entries.get_mut(index as usize);
 
            return &mut **entry;
 
        }
 
    }
 

	
 
    /// Creates a new connector. Caller should ensure ports are set up correctly
 
    /// and the connector is queued for execution if needed.
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.entries.len();
 
            key = ConnectorKey{
 
                index: index as u32, generation: 0
 
            };
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(StoreEntry{
 
                connector,
 
                generation: AtomicU32::new(0),
 
                num_users: AtomicU32::new(1),
 
            }));
 
            self.entries.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = &mut **self.entries.get_mut(index);
 
                std::ptr::write(&mut target.connector as *mut _, connector);
 
                let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst);
 
                debug_assert_eq!(_old_num_users, 0);
 

	
 
                let generation = target.generation.load(Ordering::Acquire);
 
                key = ConnectorKey{ index: index as u32, generation };
 
                target.connector.ctx.id = key.downcast();
 
            }
 
        }
 

	
 
        println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        return key;
 
    }
 

	
 
    /// Destroys a connector. Caller should make sure it is not scheduled for
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let target = self.entries.get_mut(key.index as usize);
 
            (**target).generation.fetch_add(1, Ordering::SeqCst);
 
            std::ptr::drop_in_place(*target);
 
            // Note: but not deallocating!
 
        }
 

	
 
        println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        self.free.push(key.index as usize);
 
    }
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::RoundConclusion;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::{ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{
 
    Message, DataMessage,
 
    SyncCompMessage, SyncPortMessage,
 
    ControlContent, ControlMessage
 
};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    success: bool,
 
    inbox: Vec<ValueGroup>,
 
@@ -513,59 +511,50 @@ impl ApplicationInterface {
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 

	
 
        let (is_done, condition) = &*self.sync_done;
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        let result = lock.take().unwrap();
 
        if result.success {
 
            return Ok(result.inbox);
 
        } else {
 
            return Err(ApplicationEndSyncError::Failure);
 
        }
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let connector = self.runtime.get_component_public(self.connector_id);
 
        connector.inbox.insert_message(Message::Control(ControlMessage {
 
        let message = ControlMessage {
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ping,
 
        }));
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        }
 
        };
 
        self.runtime.send_message_maybe_destroyed(self.connector_id, Message::Control(message));
 
    }
 

	
 
    fn find_port_by_id(&self, port_id: PortIdLocal) -> Option<PortKind> {
 
        return self.owned_ports.iter()
 
            .find(|(_, owned_id)| *owned_id == port_id)
 
            .map(|(port_kind, _)| *port_kind);
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::Shutdown);
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        self.runtime.decrement_active_interfaces();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::mem::MaybeUninit;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::collections::RawVec;
 

	
 
use crate::protocol::eval::EvalError;
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::branch::{BranchId};
 
use super::connector::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox::{
 
    Message, DataMessage, SyncHeader,
 
    Message, DataMessage,
 
    ControlMessage, ControlContent,
 
    SyncControlMessage, SyncControlContent,
 
};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        'thread_loop: loop {
 
@@ -47,335 +46,339 @@ impl Scheduler {
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        self.runtime.initiate_component_destruction(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                            self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        self.runtime.initiate_component_destruction(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 

	
 
                    // We insert directly into the private inbox. Since we have
 
                    // a reroute entry the component can not yet be running.
 
                    if let Message::Control(_) = &message {
 
                        self.runtime.send_message(other_component_id, message);
 
                        self.runtime.send_message_assumed_alive(other_component_id, message);
 
                    } else {
 
                        let key = unsafe { ConnectorKey::from_id(other_component_id) };
 
                        let component = self.runtime.get_component_private(&key);
 
                        component.ctx.inbox.insert_new(message);
 
                    }
 

	
 
                    continue;
 
                }
 

	
 
                match scheduled.ctx.get_port_by_id(target_port) {
 
                    Some(port_info) => {
 
                        if port_info.state == PortState::Closed {
 
                            // We're no longer supposed to receive messages
 
                            // (rerouted message arrived much later!)
 
                            continue
 
                        }
 
                    },
 
                    None => {
 
                        // Apparently we no longer have a handle to the port
 
                        continue;
 
                    }
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.peer_connector = new_target_connector_id;
 

	
 
                        // Note: for simplicity we program the scheduler to always finish
 
                        // running a connector with an empty outbox. If this ever changes
 
                        // then accepting the "port peer changed" message implies we need
 
                        // to change the recipient of the message in the outbox.
 
                        debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                        // And respond with an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                        self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::CloseChannel(port_id) => {
 
                        // Mark the port as being closed
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.state = PortState::Closed;
 

	
 
                        // Send an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                        self.runtime.send_message_assumed_alive(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some(component_key) = scheduled.router.handle_ack(message.id) {
 
                            self.runtime.push_work(component_key);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            if port.state == PortState::Open {
 
                let message = SyncControlMessage {
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message));
 
                self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
                self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message));
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            let target_component_id = match &message {
 
            let (target_component_id, over_port) = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                    (content.target_component_id, false)
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"),
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox] \n --- {:#?}", target_component_id, message));
 
            self.runtime.send_message(target_component_id, message);
 
            self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message));
 
            if over_port {
 
                self.runtime.send_message_assumed_alive(target_component_id, message);
 
            } else {
 
                self.runtime.send_message_maybe_destroyed(target_component_id, message);
 
            }
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. Need to relinquish control of
 
                    // the ports.
 
                    let new_component_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_component_key);
 

	
 
                    // First pass: transfer ports and the associated messages,
 
                    // also count the number of ports that have peers
 
                    let mut num_peers = 0;
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox);
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        if port.state == PortState::Open {
 
                            num_peers += 1;
 
                        }
 
                    }
 

	
 
                    if num_peers == 0 {
 
                        // No peers to notify, so just schedule the component
 
                        self.runtime.push_work(new_component_key);
 
                    } else {
 
                        // Some peers to notify
 
                        let new_component_id = new_component_key.downcast();
 
                        let control_id = scheduled.router.prepare_new_component(new_component_key);
 
                        for port in new_connector.ctx.ports.iter() {
 
                            if port.state == PortState::Closed {
 
                                continue;
 
                            }
 

	
 
                            let control_message = scheduled.router.prepare_changed_port_peer(
 
                                control_id, scheduled.ctx.id,
 
                                port.peer_connector, port.peer_id,
 
                                new_component_id, port.self_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(control_message));
 
                            self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message));
 
                        }
 
                    }
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
@@ -626,250 +629,136 @@ impl<'a> Iterator for MessagesIter<'a> {
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Private Inbox
 
// -----------------------------------------------------------------------------
 

	
 
/// A structure that contains inbox messages. Some messages are left inside and
 
/// continuously re-read. Others are taken out, but may potentially be put back
 
/// for later reading. Later reading in this case implies that they are put back
 
/// for reading in the next sync round.
 
/// TODO: Again, lazy concurrency, see git history for other implementation
 
struct Inbox {
 
    temp_m: Vec<Message>,
 
    temp_d: Vec<Message>,
 
    messages: RawVec<Message>,
 
    next_delay_idx: u32,
 
    start_read_idx: u32,
 
    messages: Vec<Message>,
 
    delayed: Vec<Message>,
 
    next_read_idx: u32,
 
    last_read_idx: u32,
 
    generation: u32,
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub(crate) struct MessageTicket {
 
    index: u32,
 
    generation: u32,
 
}
 

	
 
impl Inbox {
 
    fn new() -> Self {
 
        return Inbox {
 
            temp_m: Vec::new(), temp_d: Vec::new(),
 
            messages: RawVec::new(),
 
            next_delay_idx: 0,
 
            start_read_idx: 0,
 
            messages: Vec::new(),
 
            delayed: Vec::new(),
 
            next_read_idx: 0,
 
            last_read_idx: 0,
 
            generation: 0,
 
        }
 
    }
 

	
 
    fn insert_new(&mut self, message: Message) {
 
        assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size
 
        self.temp_m.push(message);
 
        return;
 
        self.messages.push(message);
 
    }
 

	
 
    fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if self.next_read_idx as usize >= self.temp_m.len() { return None };
 
        if self.next_read_idx as usize >= self.messages.len() { return None };
 
        let idx = self.next_read_idx;
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{ index: idx, generation: self.generation });
 
        let cur_read_idx = self.next_read_idx as usize;
 
        if cur_read_idx >= self.messages.len() {
 
            return None;
 
        }
 

	
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{
 
            index: cur_read_idx as u32,
 
            generation: self.generation
 
        });
 
    }
 

	
 
    fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        return &self.temp_m[ticket.index as usize];
 
        return unsafe{ &*self.messages.get(ticket.index as usize) }
 
        return &self.messages[ticket.index as usize];
 
    }
 

	
 
    fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        debug_assert!(ticket.index < self.next_read_idx);
 
        self.next_read_idx -= 1;
 
        return self.temp_m.remove(ticket.index as usize);
 
        unsafe {
 
            let take_idx = ticket.index as usize;
 
            let val = std::ptr::read(self.messages.get(take_idx));
 

	
 
            // Move messages to the right, clearing up space in the
 
            // front.
 
            let num_move_right = take_idx - self.start_read_idx as usize;
 
            self.messages.move_range(
 
                self.start_read_idx as usize,
 
                self.start_read_idx as usize + 1,
 
                num_move_right
 
            );
 

	
 
            self.start_read_idx += 1;
 

	
 
            return val;
 
        }
 
        return self.messages.remove(ticket.index as usize);
 
    }
 

	
 
    fn put_back_message(&mut self, message: Message) {
 
        // We have space in front of the array because we've taken out a message
 
        // before.
 
        self.temp_d.push(message);
 
        return;
 
        debug_assert!(self.next_delay_idx < self.start_read_idx);
 
        unsafe {
 
            // Write to front of the array
 
            std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message);
 
            self.next_delay_idx += 1;
 
        }
 
        self.delayed.push(message);
 
    }
 

	
 
    fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter{
 
            messages: self.temp_m.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
        return MessagesIter{
 
            messages: self.messages.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            next_index: 0,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
    }
 

	
 
    fn clear_read_messages(&mut self) {
 
        self.temp_m.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.temp_d.drain(..).enumerate() {
 
            self.temp_m.insert(idx, v);
 
        self.messages.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.delayed.drain(..).enumerate() {
 
            self.messages.insert(idx, v);
 
        }
 
        self.next_read_idx = 0;
 
        return;
 
        // Deallocate everything that was read
 
        self.destroy_range(self.start_read_idx, self.next_read_idx);
 
        self.generation += 1;
 

	
 
        // Join up all remaining values with the delayed ones in the front
 
        let num_to_move = self.messages.len() - self.next_read_idx as usize;
 
        self.messages.move_range(
 
            self.next_read_idx as usize,
 
            self.next_delay_idx as usize,
 
            num_to_move
 
        );
 

	
 
        // Set all indices (and the RawVec len) to make sense in this new state
 
        let new_len = self.next_delay_idx as usize + num_to_move;
 
        self.next_delay_idx = 0;
 
        self.start_read_idx = 0;
 
        self.next_read_idx = 0;
 
        self.messages.len = new_len;
 
    }
 

	
 
    fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) {
 
        debug_assert!(self.temp_d.is_empty());
 
        debug_assert!(self.delayed.is_empty());
 
        let mut idx = 0;
 
        while idx < self.temp_m.len() {
 
            let msg = &self.temp_m[idx];
 
        while idx < self.messages.len() {
 
            let msg = &self.messages[idx];
 
            if let Some(target) = msg.target_port() {
 
                if target == port {
 
                    new_inbox.temp_m.push(self.temp_m.remove(idx));
 
                    new_inbox.messages.push(self.messages.remove(idx));
 
                    continue;
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
        return;
 

	
 
        let mut idx = 0;
 
        while idx < self.messages.len() {
 
            let message = unsafe{ &*self.messages.get(idx) };
 
            if let Some(target_port) = message.target_port() {
 
                if target_port == port {
 
                    // Transfer port
 
                    unsafe {
 
                        let message = std::ptr::read(message as *const _);
 
                        let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition
 
                        if remaining > 0 {
 
                            self.messages.move_range(idx + 1, idx, remaining);
 
                        }
 
                        self.messages.len -= 1;
 
                        new_inbox.insert_new(message);
 
                    }
 

	
 
                    continue; // do not increment index
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn destroy_range(&mut self, start_idx: u32, end_idx: u32) {
 
        for idx in (start_idx as usize)..(end_idx as usize) {
 
            unsafe {
 
                let msg = self.messages.get_mut(idx);
 
                std::ptr::drop_in_place(msg);
 
            }
 
        }
 
    }
 
}
 
//
 
// impl Drop for Inbox {
 
//     fn drop(&mut self) {
 
//         // Whether in sync or not in sync. We have two ranges of allocated
 
//         // messages:
 
//         // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
//         // - readable messages: from `start_read_idx` to `messages.len`
 
//         self.destroy_range(0, self.next_delay_idx);
 
//         self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
//     }
 
// }
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    NewComponent(ControlNewComponent),
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
impl ControlVariant {
 
    fn as_new_component_mut(&mut self) -> &mut ControlNewComponent {
 
        match self {
 
            ControlVariant::NewComponent(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 
}
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 750;  // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 6;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 2;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
0 comments (0 inline, 0 general)