Changeset - a43d61913724
[Not reviewed]
0 9 0
MH - 4 years ago 2021-10-20 09:00:45
contact@maxhenger.nl
prepare for debugging
9 files changed with 225 insertions and 191 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
@@ -174,7 +174,7 @@ impl ProtocolDescription {
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&heap, identifier);
 
        let definition_id = root.get_definition_ident(&self.heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
@@ -210,7 +210,7 @@ impl ProtocolDescription {
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments),
 
            prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments),
 
        });
 
    }
 

	
 
@@ -232,24 +232,18 @@ impl ProtocolDescription {
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        macro_rules! match_variant {
 
            ($value:expr, $variant:expr) => {
 
                if let $variant(_) = $value { true } else { false }
 
            };
 
        }
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => match_variant!(argument, Value::Bool),
 
            CTP::UInt8 => match_variant!(argument, Value::UInt8),
 
            CTP::UInt16 => match_variant!(argument, Value::UInt16),
 
            CTP::UInt32 => match_variant!(argument, Value::UInt32),
 
            CTP::UInt64 => match_variant!(argument, Value::UInt64),
 
            CTP::SInt8 => match_variant!(argument, Value::SInt8),
 
            CTP::SInt16 => match_variant!(argument, Value::SInt16),
 
            CTP::SInt32 => match_variant!(argument, Value::SInt32),
 
            CTP::SInt64 => match_variant!(argument, Value::SInt64),
 
            CTP::Character => match_variant!(argument, Value::Char),
 
            CTP::Bool => if let Value::Bool(_) = argument { true } else { false },
 
            CTP::UInt8 => if let Value::UInt8(_) = argument { true } else { false },
 
            CTP::UInt16 => if let Value::UInt16(_) = argument { true } else { false },
 
            CTP::UInt32 => if let Value::UInt32(_) = argument { true } else { false },
 
            CTP::UInt64 => if let Value::UInt64(_) = argument { true } else { false },
 
            CTP::SInt8 => if let Value::SInt8(_) = argument { true } else { false },
 
            CTP::SInt16 => if let Value::SInt16(_) = argument { true } else { false },
 
            CTP::SInt32 => if let Value::SInt32(_) = argument { true } else { false },
 
            CTP::SInt64 => if let Value::SInt64(_) = argument { true } else { false },
 
            CTP::Character => if let Value::Char(_) = argument { true } else { false },
 
            CTP::String => {
 
                // Match outer string type and embedded character types
 
                if let Value::String(heap_pos) = argument {
 
@@ -277,8 +271,8 @@ impl ProtocolDescription {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => match_variant!(argument, Value::Input),
 
            CTP::Output => match_variant!(argument, Value::Output),
 
            CTP::Input => if let Value::Input(_) = argument { true } else { false },
 
            CTP::Output => if let Value::Output(_) = argument { true } else { false },
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage};
 
use crate::runtime2::inbox::{MessageContents, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::PortKind;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage,
 
    PrivateInbox, PublicInbox, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 
@@ -41,7 +39,7 @@ impl BranchId {
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
@@ -141,7 +139,7 @@ impl PortAssignment {
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
#[derive(Clone)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
@@ -227,7 +225,7 @@ impl ConnectorPorts {
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if port == port_id {
 
            if *port == port_id {
 
                return Some(idx)
 
            }
 
        }
 
@@ -250,7 +248,7 @@ impl ConnectorPorts {
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
        return &mut self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
@@ -323,7 +321,7 @@ pub(crate) struct ConnectorPDL {
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32,
 
    sync_finished_last_handled: u32, // TODO: Change to BranchId?
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
@@ -365,7 +363,7 @@ impl Connector for ConnectorPDL {
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, results);
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
@@ -389,13 +387,13 @@ impl Connector for ConnectorPDL {
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, ctx, results);
 
                        self.submit_sync_solution(valid_solution, ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                        if branch_id.index == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
@@ -414,7 +412,7 @@ impl Connector for ConnectorPDL {
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
@@ -456,7 +454,7 @@ impl ConnectorPDL {
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 
@@ -513,7 +511,7 @@ impl ConnectorPDL {
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            if mapping.last_registered_branch_id != expected_branch_id {
 
                            if mapping.last_registered_branch_id != *expected_branch_id {
 
                                // Not the expected interaction on this port, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
@@ -571,9 +569,9 @@ impl ConnectorPDL {
 
                    };
 

	
 
                    match new_solution.add_or_check_constraint(peer_connector_id, constraint) {
 
                        None => continue 'branch_loop,
 
                        Some(false) => continue 'branch_loop,
 
                        Some(true) => {},
 
                        Err(_) => continue 'branch_loop,
 
                        Ok(false) => continue 'branch_loop,
 
                        Ok(true) => {},
 
                    }
 
                }
 

	
 
@@ -599,7 +597,7 @@ impl ConnectorPDL {
 
                // Already committed to something. So will commit to this if it
 
                // takes precedence over the current solution
 
                message.comparison_number > *previous_comparison ||
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0)
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0)
 
            },
 
            None => {
 
                // Not yet committed to a solution, so commit to this one
 
@@ -844,7 +842,7 @@ impl ConnectorPDL {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutgoingDataMessage {
 
                    let message = DataMessage{
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
@@ -858,7 +856,7 @@ impl ConnectorPDL {
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(OutgoingMessage::Data(message));
 
                    results.outbox.push(MessageContents::Data(message));
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
@@ -948,16 +946,16 @@ impl ConnectorPDL {
 
    // linked lists inside of the vector of branches.
 

	
 
    /// Pops from front of linked-list branch queue.
 
    fn pop_branch_from_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
    fn pop_branch_from_queue<'a>(branches: &'a mut Vec<Branch>, queue: &mut BranchQueue) -> &'a mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
        if queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
            debug_assert_eq!(queue.last, branch.index.index);
 
            queue.last = 0;
 
        }
 

	
 
        return branch;
 
@@ -970,17 +968,17 @@ impl ConnectorPDL {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
        if queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            debug_assert_eq!(queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
            queue.first = to_push;
 
            queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
            debug_assert_ne!(queue.first, 0);
 
            branches[queue.last as usize].next_branch_in_queue = Some(to_push);
 
            queue.last = to_push;
 
        }
 
    }
 

	
 
@@ -1050,6 +1048,7 @@ impl ConnectorPDL {
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        for port_id in port_ids {
 
@@ -1064,7 +1063,7 @@ impl ConnectorPDL {
 
                    }
 

	
 
                    for delta in &branch.ports_delta {
 
                        if delta.port_id == port_id {
 
                        if delta.port_id == *port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
@@ -1100,6 +1099,7 @@ impl ConnectorPDL {
 

	
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
@@ -1175,7 +1175,7 @@ impl ConnectorPDL {
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() {
 
                return None;
 
            }
 

	
 
@@ -1202,7 +1202,7 @@ impl ConnectorPDL {
 
                SyncBranchConstraint::SilentPort(port.peer_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
            if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 
@@ -1268,6 +1268,7 @@ pub(crate) struct RunDeltaState {
 
    // state changes and try to apply them.
 
    pub outbox: Vec<MessageContents>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    pub new_ports: Vec<Port>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 
@@ -1279,11 +1280,13 @@ impl RunDeltaState {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            new_ports: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
@@ -1300,7 +1303,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
src/runtime2/global_store.rs
Show inline comments
 
use std::ptr;
 
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
 
use std::sync::{Arc, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::{Connector, ConnectorApplication};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
@@ -40,7 +38,7 @@ impl ConnectorKey {
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
@@ -121,7 +119,7 @@ impl ConnectorStore {
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
            return &(**connector).public;
 
        }
 
    }
 

	
 
@@ -133,16 +131,56 @@ impl ConnectorStore {
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
 
        // Connector interface does not own any initial ports, and cannot be
 
        // created by another connector
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
 
        return key;
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec<Port>) -> ConnectorKey {
 
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
 
        let new_connector = self.get_mut(&key);
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => unreachable!(),
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 

	
 
    /// Creates a connector but does not set its initial ports
 
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        let index;
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
@@ -152,7 +190,6 @@ impl ConnectorStore {
 
                router: Router::new(),
 
            };
 

	
 
            let index;
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
@@ -172,37 +209,14 @@ impl ConnectorStore {
 
            }
 
        }
 

	
 
        // Setting of new connector's ID
 
        // Generate key and retrieve the connector to set its ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
 
        }
 

	
 
        // Return the connector key
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
src/runtime2/inbox.rs
Show inline comments
 
@@ -13,11 +13,11 @@ within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 
use std::sync::Mutex;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use super::connector::{BranchId, PortIdLocal};
 
use super::connector::BranchId;
 
use super::port::PortIdLocal;
 
use super::global_store::ConnectorId;
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
@@ -143,7 +143,7 @@ impl SyncMessage {
 
        match constraint {
 
            SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == silent_port_id {
 
                    if *port_id == silent_port_id {
 
                        // If silent, then mapped ID is invalid
 
                        return Ok(!mapped_id.is_valid())
 
                    }
 
@@ -167,6 +167,7 @@ impl SyncMessage {
 
    }
 
}
 

	
 
#[derive(Clone)]
 
pub struct SolutionMessage {
 
    pub comparison_number: u64,
 
    pub connector_origin: ConnectorId,
 
@@ -176,11 +177,13 @@ pub struct SolutionMessage {
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
#[derive(Clone)]
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub content: ControlMessageVariant,
 
}
 

	
 
#[derive(Clone)]
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
@@ -258,7 +261,7 @@ impl PrivateInbox {
 
        for existing in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
                    existing.sending_port == message.sending_port {
 
                // Message was already received
 
                return;
 
            }
 
@@ -313,10 +316,10 @@ pub struct InboxMessageIter<'i> {
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m DataMessage;
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.messages[self.next_index];
src/runtime2/messages.rs
Show inline comments
 
use std::cmp::Ordering;
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use crate::common::Id;
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::connector::{BranchId, PortIdLocal};
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
src/runtime2/mod.rs
Show inline comments
 
@@ -17,13 +17,10 @@ use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 
use crate::ProtocolDescription;
 

	
 
use global_store::{ConnectorVariant, GlobalStore};
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
@@ -42,6 +39,10 @@ pub(crate) struct RuntimeInner {
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
@@ -56,8 +57,9 @@ impl Runtime {
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
                let mut scheduler = Scheduler::new(runtime_inner.clone());
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::spawn(move || {
 
                    let mut scheduler = Scheduler::new(cloned_runtime_inner);
 
                    scheduler.run();
 
                });
 

	
 
@@ -76,9 +78,7 @@ impl Runtime {
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector = Box::new(connector);
 

	
 
        let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
 
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
src/runtime2/native.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::global_store::ConnectorId;
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 
use super::inbox::Message;
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
@@ -32,6 +31,7 @@ type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
@@ -63,6 +63,11 @@ impl Connector for ConnectorApplication {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
@@ -80,11 +85,11 @@ pub struct ApplicationInterface {
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<Port>,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
@@ -99,19 +104,31 @@ impl ApplicationInterface {
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.owned_ports.push(Port{
 
        // Create ports and add a job such that they are transferred to the
 
        // API component. (note that we do not send a ping, this is only
 
        // necessary once we create a connector)
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        self.owned_ports.push(Port{
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        });
 
        };
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 
@@ -130,7 +147,7 @@ impl ApplicationInterface {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove)
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
@@ -150,7 +167,12 @@ impl ApplicationInterface {
 

	
 
        // Send ping message to wake up connector
 
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
 
        connector.inbox.insert_message(Message::Ping);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 
@@ -21,6 +21,7 @@ impl PortIdLocal {
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Channel, PortKind, PortOwnership};
 
use crate::runtime2::port::{Channel, PortKind};
 

	
 
use super::RuntimeInner;
 
use super::port::{Port, PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 
use super::inbox::{Message, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
@@ -29,7 +27,7 @@ impl ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            port_counter,
 
            ports: initial_ports,
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
@@ -131,53 +129,46 @@ impl Scheduler {
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    match message.contents {
 
                        MessageContents::Data(content) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content));
 
                            } else {
 
                                scheduled.connector.insert_data_message(content);
 
                            }
 
                        }
 
                        MessageContents::Sync(content) => {
 
                            scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state);
 
                        }
 
                        MessageContents::Solution(content) => {
 
                            // TODO: Handle solution message
 
                        },
 
                        MessageContents::Control(content) => {
 
                            match content.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = scheduled.context.get_port_mut(port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                    // And respond with an Ack
 
                                    // Note: after this code has been reached, we may not have any
 
                                    // messages in the outbox that send to the port whose owning
 
                                    // connector we just changed. This is because the `ack` will
 
                                    // clear the rerouting entry of the `ack`-receiver.
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        content.sender,
 
                                        Message{
 
                                            sending_connector: connector_key.downcast(),
 
                                            receiving_port: PortIdLocal::new_invalid(),
 
                                            contents: MessageContents::Control(ControlMessage{
 
                                                id: content.id,
 
                                                content: ControlMessageVariant::Ack,
 
                                            }),
 
                                        }
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(content.id);
 
                                }
 
                    // Check for rerouting
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                        Message::Ping => {},
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
@@ -252,7 +243,7 @@ impl Scheduler {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: contents.clone(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
 
                        }
 
@@ -277,6 +268,12 @@ impl Scheduler {
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                context.ports.push(port);
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
@@ -284,7 +281,7 @@ impl Scheduler {
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector));
 
                let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector);
 
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
@@ -296,7 +293,7 @@ impl Scheduler {
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
                    self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
@@ -305,7 +302,7 @@ impl Scheduler {
 
        }
 
    }
 

	
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
    fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
@@ -324,7 +321,7 @@ impl Scheduler {
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    port: PortIdLocal,              // targeted port
 
    target_port: PortIdLocal,       // targeted port
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
}
 
@@ -356,23 +353,27 @@ impl Router {
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            target_port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                reroute.target_port == target_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
0 comments (0 inline, 0 general)