Changeset - 328f04b6612f
[Not reviewed]
0 7 0
MH - 4 years ago 2021-11-09 12:42:43
contact@maxhenger.nl
Initial pass of fixing compiler errors
7 files changed with 77 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::{Value, ValueGroup};
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
/// we use this fact to let branch ID 0 denote the ID being invalid.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct BranchId {
 
@@ -75,20 +75,20 @@ impl Branch {
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
                (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.index,
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
            prepared_channel: None,
 
@@ -127,13 +127,13 @@ impl BranchQueue {
 
        return !self.first.is_valid();
 
    }
 
}
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
    FinishedSync,
 
}
 

	
 
@@ -157,13 +157,13 @@ impl QueueKind {
 
/// are described by the various `BranchQueue` instances and the `next_in_queue`
 
/// field in each branch.
 
pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    pub queues: [BranchQueue; NUM_QUEUES]
 
    queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
    /// Constructs a new execution tree with a single non-sync branch.
 
    pub fn new(component: ComponentState) -> Self {
 
        return Self {
 
@@ -232,13 +232,13 @@ impl ExecTree {
 
                debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id));
 
                let branch = &self.branches[branch_id.index as usize];
 

	
 
                branch.next_in_queue.index as usize
 
            },
 
            None => {
 
                queue.first as usize
 
                queue.first.index as usize
 
            }
 
        };
 

	
 
        return BranchQueueIter {
 
            branches: self.branches.as_slice(),
 
            index,
 
@@ -319,13 +319,13 @@ impl IndexMut<BranchId> for ExecTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        debug_assert!(index.is_valid());
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
pub struct BranchQueueIter<'a> {
 
pub(crate) struct BranchQueueIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchQueueIter<'a> {
 
    type Item = &'a Branch;
 
@@ -339,13 +339,13 @@ impl<'a> Iterator for BranchQueueIter<'a> {
 
        let branch = &self.branches[self.index];
 
        self.index = branch.next_in_queue.index as usize;
 
        return Some(branch);
 
    }
 
}
 

	
 
pub struct BranchParentIter<'a> {
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for BranchParentIter<'a> {
 
    type Item = &'a Branch;
src/runtime2/connector2.rs
Show inline comments
 
@@ -33,13 +33,13 @@ use crate::PortId;
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::consensus::find_ports_in_value_group;
 
use crate::runtime2::port::PortKind;
 

	
 
use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency};
 
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox};
 
use super::native::Connector;
 
use super::port::PortIdLocal;
 
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
@@ -142,15 +142,16 @@ impl ConnectorPDL {
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches);
 
        let mut branches = Vec::new(); // TODO: @Remove
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);
 

	
 
        for branch_id in ctx.workspace_branches.drain(..) {
 
        for branch_id in branches.drain(..) {
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
@@ -182,13 +183,13 @@ impl ConnectorPDL {
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
@@ -305,27 +306,27 @@ impl ConnectorPDL {
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                let current_ports = comp_ctx.notify_sync_start();
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                self.consensus.start_sync(current_ports, comp_ctx);
 
                self.consensus.start_sync(comp_ctx);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
src/runtime2/consensus.rs
Show inline comments
 
@@ -11,19 +11,20 @@ use super::inbox2::{
 
use super::scheduler::ComponentCtxFancy;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
#[derive(Clone)]
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
@@ -50,13 +51,13 @@ pub(crate) struct Consensus {
 
    encountered_peers: VecSet<ConnectorId>,
 
    solution_combiner: SolutionCombiner,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
impl Consensus {
 
@@ -85,23 +86,23 @@ impl Consensus {
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) {
 
    pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
        debug_assert!(self.encountered_peers.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ports.iter()
 
            port_mapping: ctx.get_ports().iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: v.self_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
@@ -234,33 +235,34 @@ impl Consensus {
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let sync_header = self.create_sync_header(ctx);
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are begin sent
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let sync_header = self.create_sync_header(ctx);
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.peer_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
@@ -306,13 +308,13 @@ impl Consensus {
 
                // be the leader anymore.
 
                return self.send_or_store_local_solution(solution, ctx);
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .find(|(connector_id, _)| *connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
@@ -386,20 +388,20 @@ impl Consensus {
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for encountered_id in self.encountered_peers.iter() {
 
                if encountered_id == sync_header.sending_component_id {
 
                if *encountered_id == sync_header.sending_component_id {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: encountered_id,
 
                    target_component_id: *encountered_id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
@@ -584,13 +586,13 @@ impl SolutionCombiner {
 
                        involved_channels: matching_channels,
 
                    });
 
                }
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in component_peers {
 
            for peer in &component_peers {
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
            if num_ports_in_peers == cur_ports.len() {
 
                // Newly added component has all required peers present
 
                self.local[component_index].all_peers_present = true;
 
@@ -820,13 +822,13 @@ impl SolutionCombiner {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 

	
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(encountered_branch_id, branch_id);
 
                        debug_assert_eq!(*encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
 
                        final_mapping.push((channel_id, branch_id));
 
                    }
 
                }
src/runtime2/inbox2.rs
Show inline comments
 
@@ -95,18 +95,18 @@ impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: MessageFancy) {
 
    pub(crate) fn insert_message(&self, message: MessageFancy) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<MessageFancy> {
 
    pub(crate) fn take_message(&self) -> Option<MessageFancy> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
src/runtime2/mod.rs
Show inline comments
 
@@ -52,13 +52,13 @@ impl ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
src/runtime2/native.rs
Show inline comments
 
@@ -5,13 +5,12 @@ use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::scheduler::{SchedulerCtx, ComponentCtxFancy};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::branch::{Branch};
 
use super::consensus::find_ports_in_value_group;
 
use super::connector2::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
@@ -144,13 +143,13 @@ impl ApplicationInterface {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap();
 
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(state);
 

	
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::inbox2::ControlContent;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant};
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::branch::{BranchId};
 
use super::connector2::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy};
 
use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 
@@ -124,23 +124,26 @@ impl Scheduler {
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check for rerouting
 
            self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                self.runtime.send_message(other_connector_id, message);
 
                continue;
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_data_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // Handle special messages here, messages for the component
 
            // will be added to the inbox.
 
            self.debug_conn(connector_id, " ... Handling message myself");
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                MessageFancy::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
@@ -194,13 +197,13 @@ impl Scheduler {
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() {
 
        while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 

	
 
            let target_component_id = match &message {
 
                MessageFancy::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
@@ -228,34 +231,36 @@ impl Scheduler {
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Add the new connector to the global registry
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    // Transfer ports
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                            if message.receiving_port == *port_id {
 
                            if Self::get_data_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(taken_message);
 
                                let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
                        }
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx_fancy.ports.iter()
 
                            .position(|v| v.self_id == *port_id)
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx_fancy.ports.remove(port_index);
 
                        new_connector.ctx_fancy.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
@@ -321,12 +326,21 @@ impl Scheduler {
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn get_data_message_target_port(message: &MessageFancy) -> Option<PortIdLocal> {
 
        if let MessageFancy::Data(message) = message {
 
            return Some(message.data_header.target_port)
 
        }
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
@@ -401,30 +415,34 @@ impl ComponentCtxFancy {
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in
 
    /// the context by exiting your `Component::run` function with an
 
    /// appropriate scheduling value.
 
    pub(crate) fn notify_sync_start(&mut self) -> &[Port] {
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
        return &self.ports
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 
@@ -611,13 +629,13 @@ impl ControlMessageHandler {
 
            content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id),
 
        };
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.target_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
0 comments (0 inline, 0 general)