Changeset - a43d61913724
[Not reviewed]
0 9 0
MH - 4 years ago 2021-10-20 09:00:45
contact@maxhenger.nl
prepare for debugging
9 files changed with 225 insertions and 191 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
@@ -165,25 +165,25 @@ impl ProtocolDescription {
 
    //  entirety. Find some way to interface with the parameter's types.
 
    pub(crate) fn new_component_v2(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<ComponentState, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&heap, identifier);
 
        let definition_id = root.get_definition_ident(&self.heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let definition = &self.heap[definition_id];
 
        if !definition.is_component() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
@@ -201,64 +201,58 @@ impl ProtocolDescription {
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type = &expr_data.arg_types[arg_idx];
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments),
 
            prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments),
 
        });
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
 
                None => if module_name.is_empty() {
 
                    return Some(module.root_id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        macro_rules! match_variant {
 
            ($value:expr, $variant:expr) => {
 
                if let $variant(_) = $value { true } else { false }
 
            };
 
        }
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => match_variant!(argument, Value::Bool),
 
            CTP::UInt8 => match_variant!(argument, Value::UInt8),
 
            CTP::UInt16 => match_variant!(argument, Value::UInt16),
 
            CTP::UInt32 => match_variant!(argument, Value::UInt32),
 
            CTP::UInt64 => match_variant!(argument, Value::UInt64),
 
            CTP::SInt8 => match_variant!(argument, Value::SInt8),
 
            CTP::SInt16 => match_variant!(argument, Value::SInt16),
 
            CTP::SInt32 => match_variant!(argument, Value::SInt32),
 
            CTP::SInt64 => match_variant!(argument, Value::SInt64),
 
            CTP::Character => match_variant!(argument, Value::Char),
 
            CTP::Bool => if let Value::Bool(_) = argument { true } else { false },
 
            CTP::UInt8 => if let Value::UInt8(_) = argument { true } else { false },
 
            CTP::UInt16 => if let Value::UInt16(_) = argument { true } else { false },
 
            CTP::UInt32 => if let Value::UInt32(_) = argument { true } else { false },
 
            CTP::UInt64 => if let Value::UInt64(_) = argument { true } else { false },
 
            CTP::SInt8 => if let Value::SInt8(_) = argument { true } else { false },
 
            CTP::SInt16 => if let Value::SInt16(_) = argument { true } else { false },
 
            CTP::SInt32 => if let Value::SInt32(_) = argument { true } else { false },
 
            CTP::SInt64 => if let Value::SInt64(_) = argument { true } else { false },
 
            CTP::Character => if let Value::Char(_) = argument { true } else { false },
 
            CTP::String => {
 
                // Match outer string type and embedded character types
 
                if let Value::String(heap_pos) = argument {
 
                    for element in &arguments.regions[*heap_pos as usize] {
 
                        if let Value::Char(_) = element {} else {
 
                            return false;
 
                        }
 
                    }
 
                } else {
 
                    return false;
 
                }
 

	
 
@@ -268,26 +262,26 @@ impl ProtocolDescription {
 
                if let Value::Array(heap_pos) = argument {
 
                    let heap_pos = *heap_pos;
 
                    for element in &arguments.regions[heap_pos as usize] {
 
                        if !self.verify_same_type(expected, expected_idx + 1, arguments, element) {
 
                            return false;
 
                        }
 
                    }
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => match_variant!(argument, Value::Input),
 
            CTP::Output => match_variant!(argument, Value::Output),
 
            CTP::Input => if let Value::Input(_) = argument { true } else { false },
 
            CTP::Output => if let Value::Output(_) = argument { true } else { false },
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
 
pub trait RunContext {
 
    fn did_put(&mut self, port: PortId) -> bool;
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup>; // None if still waiting on message
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::Deref;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage};
 
use crate::runtime2::inbox::{MessageContents, SolutionMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::PortKind;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 
use super::global_store::ConnectorId;
 
use super::inbox::{
 
    PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage,
 
    PrivateInbox, PublicInbox, DataMessage, SyncMessage,
 
    SyncBranchConstraint, SyncConnectorSolution
 
};
 
use super::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
@@ -32,25 +30,25 @@ impl BranchId {
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
@@ -132,25 +130,25 @@ impl PortAssignment {
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
#[derive(Clone)]
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// Contains a description of the port mapping during a particular sync session.
 
/// TODO: Extend documentation
 
@@ -218,48 +216,48 @@ impl ConnectorPorts {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if port == port_id {
 
            if *port == port_id {
 
                return Some(idx)
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    /// Retrieves the ID associated with the port at the provided index
 
    #[inline]
 
    fn get_port_id(&self, port_index: usize) -> PortIdLocal {
 
        return self.owned_ports[port_index];
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
        return &mut self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
@@ -314,25 +312,25 @@ impl ConnectorPublic {
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32,
 
    sync_finished_last_handled: u32, // TODO: Change to BranchId?
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 
@@ -356,74 +354,74 @@ impl Connector for ConnectorPDL {
 

	
 
        match message {
 
            MC::Data(message) => self.handle_data_message(message),
 
            MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state),
 
            MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state),
 
            MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state),
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, results);
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                loop {
 
                    let branch_id = BranchId::new(next_id);
 
                    let branch = &self.branches[next_id as usize];
 
                    let branch_next = branch.next_branch_in_queue;
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, ctx, results);
 
                        self.submit_sync_solution(valid_solution, ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
                        // future use
 
                        Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id);
 
                        if branch_id == self.sync_finished_last_handled {
 
                        if branch_id.index == self.sync_finished_last_handled {
 
                            self.sync_finished_last_handled = self.sync_finished.last;
 
                        }
 

	
 
                        let branch = &mut self.branches[next_id as usize];
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 

	
 
                    match branch_next {
 
                        Some(id) => next_id = id,
 
                        None => break,
 
                    }
 
                }
 

	
 
                self.sync_finished_last_handled = next_id;
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, results);
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            in_sync: false,
 
@@ -447,25 +445,25 @@ impl ConnectorPDL {
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, message: DataMessage) {
 
        self.inbox.insert_message(message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
    pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints
 
        debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints
 

	
 
        // TODO: Optimize, use some kind of temp workspace vector
 
        let mut execution_path_branch_ids = Vec::new();
 

	
 
        if self.sync_finished_last_handled != 0 {
 
            // We have some solutions to match against
 
            let constraints_index = message.constraints
 
                .iter()
 
                .position(|v| v.connector_id == ctx.id)
 
                .unwrap();
 
            let constraints = &message.constraints[constraints_index].constraints;
 
            debug_assert!(!constraints.is_empty());
 
@@ -504,25 +502,25 @@ impl ConnectorPDL {
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                        SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                            let port_index = self.ports.get_port_index(*port_id);
 
                            if port_index.is_none() {
 
                                // Nefarious peer
 
                                continue 'branch_loop;
 
                            }
 
                            let port_index = port_index.unwrap();
 

	
 
                            let mapping = self.ports.get_port(branch_index, port_index);
 
                            if mapping.last_registered_branch_id != expected_branch_id {
 
                            if mapping.last_registered_branch_id != *expected_branch_id {
 
                                // Not the expected interaction on this port, constraint not satisfied
 
                                continue 'branch_loop;
 
                            }
 
                        },
 
                    }
 
                }
 

	
 
                // If here, then all of the external constraints were satisfied
 
                // for the current branch. But the branch itself also imposes
 
                // constraints. So while building up the new solution, make sure
 
                // that those are satisfied as well.
 
                // TODO: Code below can probably be merged with initial solution
 
@@ -562,27 +560,27 @@ impl ConnectorPDL {
 
                    let mapping = self.ports.get_port(branch_index, port_index);
 
                    let constraint = if mapping.num_times_fired == 0 {
 
                        SyncBranchConstraint::SilentPort(peer_port_id)
 
                    } else {
 
                        if peer_is_getter {
 
                            SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id)
 
                        } else {
 
                            SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id)
 
                        }
 
                    };
 

	
 
                    match new_solution.add_or_check_constraint(peer_connector_id, constraint) {
 
                        None => continue 'branch_loop,
 
                        Some(false) => continue 'branch_loop,
 
                        Some(true) => {},
 
                        Err(_) => continue 'branch_loop,
 
                        Ok(false) => continue 'branch_loop,
 
                        Ok(true) => {},
 
                    }
 
                }
 

	
 
                // If here, then the newly generated solution is completely
 
                // compatible.
 
                self.submit_sync_solution(new_solution, ctx, results);
 

	
 
                // Consider the next branch
 
                if branch_index == self.sync_finished_last_handled {
 
                    // At the end of the previously handled solutions
 
                    break;
 
                }
 
@@ -590,25 +588,25 @@ impl ConnectorPDL {
 
                debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue
 
                branch_index = branch.next_branch_in_queue.unwrap();
 
            }
 
        }
 
    }
 

	
 
    fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        let should_propagate_message = match &self.committed_to {
 
            Some((previous_origin, previous_comparison)) => {
 
                // Already committed to something. So will commit to this if it
 
                // takes precedence over the current solution
 
                message.comparison_number > *previous_comparison ||
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0)
 
                    (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0)
 
            },
 
            None => {
 
                // Not yet committed to a solution, so commit to this one
 
                true
 
            }
 
        };
 

	
 
        if should_propagate_message {
 
            self.committed_to = Some((message.connector_origin, message.comparison_number));
 

	
 
            if message.to_visit.is_empty() {
 
                // Visited all of the connectors, so every connector can now
 
@@ -835,39 +833,39 @@ impl ConnectorPDL {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutgoingDataMessage {
 
                    let message = DataMessage{
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(OutgoingMessage::Data(message));
 
                    results.outbox.push(MessageContents::Data(message));
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
@@ -939,57 +937,57 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal helpers
 
    // -------------------------------------------------------------------------
 

	
 
    // Helpers for management of the branches and their internally stored
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    /// Pops from front of linked-list branch queue.
 
    fn pop_branch_from_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
    fn pop_branch_from_queue<'a>(branches: &'a mut Vec<Branch>, queue: &mut BranchQueue) -> &'a mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
        if queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
            debug_assert_eq!(queue.last, branch.index.index);
 
            queue.last = 0;
 
        }
 

	
 
        return branch;
 
    }
 

	
 
    /// Pushes branch at the end of the linked-list branch queue.
 
    fn push_branch_into_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId,
 
    ) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
        if queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            debug_assert_eq!(queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
            queue.first = to_push;
 
            queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
            debug_assert_ne!(queue.first, 0);
 
            branches[queue.last as usize].next_branch_in_queue = Some(to_push);
 
            queue.last = to_push;
 
        }
 
    }
 

	
 
    /// Removes branch from linked-list queue. Walks through the entire list to
 
    /// find the element (!). Assumption is that this is not called often.
 
    fn remove_branch_from_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_delete: BranchId,
 
    ) {
 
        debug_assert!(to_delete.is_valid()); // we're deleting a valid item
 
        debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with
 

	
 
        // Retrieve branch and its next element
 
@@ -1041,39 +1039,40 @@ impl ConnectorPDL {
 
            // We must own the port, or something is wrong with our code
 
            todo!("Set up some kind of message router");
 
            debug_assert!(ports.get_port_index(*port_id).is_some());
 
            ports.remove_port(*port_id);
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port. Make sure it is not given away
 
                    // already and not used to put/get data.
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned && port_mapping.num_times_fired != 0 {
 
                        // Already used
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id));
 
                    }
 

	
 
                    for delta in &branch.ports_delta {
 
                        if delta.port_id == port_id {
 
                        if delta.port_id == *port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
                            return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                        }
 
                    }
 

	
 
                    branch.ports_delta.push(PortOwnershipDelta{
 
                        acquired: false,
 
                        port_id: *port_id,
 
                    });
 
                },
 
@@ -1091,24 +1090,25 @@ impl ConnectorPDL {
 

	
 
                    debug_assert!(to_delete_index != -1);
 
                    branch.ports_delta.remove(to_delete_index as usize);
 
                }
 
            }
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
                    } else {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
@@ -1166,25 +1166,25 @@ impl ConnectorPDL {
 
        };
 

	
 
        let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers);
 

	
 
        // Turn local port mapping into constraints on other connectors
 

	
 
        // - constraints on other components due to transferred ports
 
        for port_delta in &branch.ports_delta {
 
            // For transferred ports we always have two constraints: one for the
 
            // sender and one for the receiver, ensuring it was not used.
 
            // TODO: This will fail if a port is passed around multiple times.
 
            //  maybe a special "passed along" entry in `ports_delta`.
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) {
 
            if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() {
 
                return None;
 
            }
 

	
 
            // Might need to check if we own the other side of the channel
 
            let port = ctx.get_port(port_delta.port_id);
 
            if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        // - constraints on other components due to owned ports
 
        for port_index in 0..self.ports.num_ports() {
 
@@ -1193,25 +1193,25 @@ impl ConnectorPDL {
 
            let port = ctx.get_port(port_id);
 

	
 
            let constraint = if port_mapping.is_assigned {
 
                if port.kind == PortKind::Getter {
 
                    SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id)
 
                } else {
 
                    SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id)
 
                }
 
            } else {
 
                SyncBranchConstraint::SilentPort(port.peer_id)
 
            };
 

	
 
            if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() {
 
            if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() {
 
                return None;
 
            }
 
        }
 

	
 
        return Some(sync_message);
 
    }
 

	
 
    fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) {
 
        if partial_solution.to_visit.is_empty() {
 
            // Solution is completely consistent. So ask everyone to commit
 
            // TODO: Maybe another package for random?
 
            let comparison_number: u64 = unsafe {
 
@@ -1259,57 +1259,60 @@ impl ConnectorPDL {
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<MessageContents>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    pub new_ports: Vec<Port>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            new_ports: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
src/runtime2/global_store.rs
Show inline comments
 
use std::ptr;
 
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
 
use std::sync::{Arc, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32};
 

	
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
 
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::{Connector, ConnectorApplication};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
@@ -31,25 +29,25 @@ impl ConnectorKey {
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
@@ -112,106 +110,122 @@ impl ConnectorStore {
 
                free: Vec::with_capacity(capacity),
 
            }),
 
        };
 
    }
 

	
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
            return &(**connector).public;
 
        }
 
    }
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
 
        // Connector interface does not own any initial ports, and cannot be
 
        // created by another connector
 
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
 
        return key;
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec<Port>) -> ConnectorKey {
 
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
 
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
 
        let new_connector = self.get_mut(&key);
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => unreachable!(),
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 

	
 
    /// Creates a connector but does not set its initial ports
 
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        let index;
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                router: Router::new(),
 
            };
 

	
 
            let index;
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
                unsafe {
 
                    let target = lock.connectors.get_mut(index);
 
                    debug_assert!(!target.is_null());
 
                    ptr::write(*target, connector);
 
                }
 
            }
 
        }
 

	
 
        // Setting of new connector's ID
 
        // Generate key and retrieve the connector to set its ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
 
        }
 

	
 
        // Return the connector key
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        for idx in 0..lock.connectors.len() {
 
            unsafe {
 
                let memory = *lock.connectors.get_mut(idx);
 
                let _ = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
src/runtime2/inbox.rs
Show inline comments
 
@@ -4,29 +4,29 @@ inbox.rs
 
Contains various types of inboxes and message types for the connectors. There
 
are two kinds of inboxes:
 

	
 
The `PublicInbox` is a simple message queue. Messages are put in by various
 
threads, and they're taken out by a single thread. These messages may contain
 
control messages and may be filtered or redirected by the scheduler.
 

	
 
The `PrivateInbox` is a temporary storage for all messages that are received
 
within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 
use std::sync::Mutex;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use super::connector::{BranchId, PortIdLocal};
 
use super::connector::BranchId;
 
use super::port::PortIdLocal;
 
use super::global_store::ConnectorId;
 

	
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
pub struct DataMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
    pub sender_cur_branch_id: BranchId,
 
    pub message: ValueGroup,
 
}
 

	
 
@@ -134,62 +134,65 @@ impl SyncMessage {
 
    /// nefarious peer has supplied a constraint with a port we do not own).
 
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
 
        debug_assert!(self.has_local_solution_for(connector_id));
 

	
 
        let entry = self.local_solutions
 
            .iter()
 
            .find(|v| v.connector_id == connector_id)
 
            .unwrap();
 

	
 
        match constraint {
 
            SyncBranchConstraint::SilentPort(silent_port_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == silent_port_id {
 
                    if *port_id == silent_port_id {
 
                        // If silent, then mapped ID is invalid
 
                        return Ok(!mapped_id.is_valid())
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
 
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
 
            },
 
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
 
                for (port_id, mapped_id) in &entry.final_port_mapping {
 
                    if port_id == port_id {
 
                        return Ok(*mapped_id == expected_branch_id);
 
                    }
 
                }
 

	
 
                return Err(());
 
            },
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
pub struct SolutionMessage {
 
    pub comparison_number: u64,
 
    pub connector_origin: ConnectorId,
 
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
 
    pub to_visit: Vec<ConnectorId>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
#[derive(Clone)]
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub content: ControlMessageVariant,
 
}
 

	
 
#[derive(Clone)]
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message contents.
 
#[derive(Clone)]
 
pub enum MessageContents {
 
    Data(DataMessage),              // data message, handled by connector
 
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
 
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
 
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
 
@@ -249,25 +252,25 @@ impl PrivateInbox {
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
                    existing.sending_port == message.sending_port {
 
                // Message was already received
 
                return;
 
            }
 
        }
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
    /// speculative conditions. Note that the inbox remains read-locked until
 
    /// the returned iterator is dropped. Should only be called by the
 
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
 
@@ -304,28 +307,28 @@ impl PrivateInbox {
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    messages: &'i Vec<DataMessage>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
    match_prev_branch_id: BranchId,
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m DataMessage;
 
impl<'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'i DataMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        if self.next_index == self.max_index {
src/runtime2/messages.rs
Show inline comments
 
use std::cmp::Ordering;
 
use std::collections::hash_map::Entry;
 
use std::collections::HashMap;
 

	
 
use crate::common::Id;
 
use crate::PortId;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::connector::{BranchId, PortIdLocal};
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
src/runtime2/mod.rs
Show inline comments
 
@@ -8,86 +8,86 @@ mod port;
 
mod global_store;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 

	
 
// Imports
 

	
 
use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 
use crate::ProtocolDescription;
 

	
 
use global_store::{ConnectorVariant, GlobalStore};
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
// Runtime API
 
// TODO: Exit condition is very dirty. Take into account:
 
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
 
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
 
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
pub(crate) struct RuntimeInner {
 
    pub(crate) global_store: GlobalStore,
 
    pub(crate) protocol_description: ProtocolDescription,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            global_store: GlobalStore::new(),
 
            protocol_description,
 
            schedulers: Mutex::new(Vec::new()),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
                let mut scheduler = Scheduler::new(runtime_inner.clone());
 
                let cloned_runtime_inner = runtime_inner.clone();
 
                let thread = thread::spawn(move || {
 
                    let mut scheduler = Scheduler::new(cloned_runtime_inner);
 
                    scheduler.run();
 
                });
 

	
 
                schedulers.push(thread);
 
            }
 

	
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector = Box::new(connector);
 

	
 
        let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
 
        let connector_key = self.inner.global_store.connectors.create_interface(connector);
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.global_store.should_exit.store(true, Ordering::Release);
 
        let mut schedulers = self.inner.schedulers.lock().unwrap();
src/runtime2/native.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::{ConnectorKey, GlobalStore};
 
use crate::runtime2::global_store::ConnectorKey;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::port::{Port, PortKind};
 
use crate::runtime2::scheduler::ConnectorCtx;
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::global_store::ConnectorId;
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 
use super::inbox::Message;
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub trait Connector {
 
    /// Handle a new message (preprocessed by the scheduler). You probably only
 
    /// want to handle `Data`, `Sync`, and `Solution` messages. The others are
 
    /// intended for the scheduler itself.
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
@@ -54,112 +54,134 @@ impl ConnectorApplication {
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<Port>,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.owned_ports.push(Port{
 
        // Create ports and add a job such that they are transferred to the
 
        // API component. (note that we do not send a ping, this is only
 
        // necessary once we create a connector)
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        self.owned_ports.push(Port{
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        });
 
        };
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove)
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        // Send ping message to wake up connector
 
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
 
        connector.inbox.insert_message(Message::Ping);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 

	
 
        return Ok(());
 
    }
 

	
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ index: id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Channel, PortKind, PortOwnership};
 
use crate::runtime2::port::{Channel, PortKind};
 

	
 
use super::RuntimeInner;
 
use super::port::{Port, PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 
use super::inbox::{Message, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    port_counter: Arc<AtomicU32>,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new(port_counter: Arc<AtomicU32>) -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            port_counter,
 
            ports: initial_ports,
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a (putter, getter) port pair belonging to the same channel. The
 
    /// port will be implicitly owned by the connector.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
@@ -122,71 +120,64 @@ impl Scheduler {
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    match message.contents {
 
                        MessageContents::Data(content) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content));
 
                            } else {
 
                                scheduled.connector.insert_data_message(content);
 
                            }
 
                        }
 
                        MessageContents::Sync(content) => {
 
                            scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state);
 
                        }
 
                        MessageContents::Solution(content) => {
 
                            // TODO: Handle solution message
 
                        },
 
                        MessageContents::Control(content) => {
 
                            match content.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = scheduled.context.get_port_mut(port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                    // And respond with an Ack
 
                                    // Note: after this code has been reached, we may not have any
 
                                    // messages in the outbox that send to the port whose owning
 
                                    // connector we just changed. This is because the `ack` will
 
                                    // clear the rerouting entry of the `ack`-receiver.
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        content.sender,
 
                                        Message{
 
                                            sending_connector: connector_key.downcast(),
 
                                            receiving_port: PortIdLocal::new_invalid(),
 
                                            contents: MessageContents::Control(ControlMessage{
 
                                                id: content.id,
 
                                                content: ControlMessageVariant::Ack,
 
                                            }),
 
                                        }
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(content.id);
 
                                }
 
                    // Check for rerouting
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                        Message::Ping => {},
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                let new_schedule = scheduled.connector.run(
 
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
 
                );
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
@@ -243,97 +234,103 @@ impl Scheduler {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: contents.clone(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 

	
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector, message);
 
                }
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                context.ports.push(port);
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector));
 
                let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector);
 
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
                    self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.runtime.global_store.connector_queue.push_back(new_key);
 
            }
 
        }
 
    }
 

	
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
    fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 
    }
 
}
 

	
 
/// Represents a rerouting entry due to a moved port
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    port: PortIdLocal,              // targeted port
 
    target_port: PortIdLocal,       // targeted port
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
}
 

	
 
pub(crate) struct Router {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
}
 

	
 
impl Router {
 
    pub fn new() -> Self {
 
        Router{
 
@@ -347,41 +344,45 @@ impl Router {
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            target_port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.port == sending_port {
 
                reroute.target_port == target_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
0 comments (0 inline, 0 general)