Changeset - 3f2759e5fc57
[Not reviewed]
0 5 0
MH - 4 years ago 2021-10-25 13:08:11
contact@maxhenger.nl
somewhat correctly handling port closing and rerouting
5 files changed with 140 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -58,12 +58,13 @@ pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    prepared_channel: Option<(Value, Value)>,
 
    sync_state: SpeculativeState,
 
    halted_at_port: PortIdLocal, // invalid if not halted
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    received: HashMap<PortIdLocal, DataMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
@@ -74,12 +75,13 @@ impl Branch {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            prepared_channel: None,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            halted_at_port: PortIdLocal::new_invalid(),
 
            next_branch_in_queue: None,
 
            received: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
@@ -95,20 +97,27 @@ impl Branch {
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            prepared_channel: None,
 
            sync_state: SpeculativeState::RunningInSync,
 
            halted_at_port: PortIdLocal::new_invalid(),
 
            next_branch_in_queue: None,
 
            received: parent_branch.received.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 

	
 
    fn commit_to_sync(&mut self) {
 
        self.index = BranchId::new(0);
 
        // Logically impossible conditions (because we have a finished branch
 
        // we are going to commit to)
 
        debug_assert!(self.prepared_channel.is_none());
 
        debug_assert!(!self.halted_at_port.is_valid());
 

	
 
        // Reset other variables to their defaults
 
        self.index = BranchId::new_invalid();
 
        self.parent_index = BranchId::new_invalid();
 
        self.sync_state = SpeculativeState::RunningNonSync;
 
        self.next_branch_in_queue = None;
 
        self.received.clear();
 
        self.ports_delta.clear();
 
    }
 
@@ -401,12 +410,44 @@ impl Connector for ConnectorPDL {
 
            MC::Control(_) | MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            // Check for new messages we haven't seen before. If any of the
 
            // pending branches can accept the message, do so.
 
            while let Some((target_port_id, message)) = self.inbox.next_message() {
 
                let mut branch_idx = self.sync_pending_get.first;
 
                while branch_idx != 0 {
 
                    let branch = &self.branches[branch_idx as usize];
 
                    let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0);
 

	
 
                    let target_port_index = self.ports.get_port_index(*target_port_id).unwrap();
 
                    let port_mapping = self.ports.get_port(branch_idx, target_port_index);
 

	
 
                    if branch.sync_state == SpeculativeState::HaltedAtBranchPoint &&
 
                        branch.halted_at_port == *target_port_id &&
 
                        port_mapping.last_registered_branch_id == message.sender_prev_branch_id {
 
                        // Branch may accept this mesage, so create a fork that
 
                        // contains this message in the inbox.
 
                        let new_branch_idx = self.branches.len() as u32;
 
                        let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch);
 

	
 
                        self.ports.prepare_sync_branch(branch_idx, new_branch_idx);
 
                        let mapping = self.ports.get_port_mut(branch_idx, target_port_index);
 
                        mapping.last_registered_branch_id = message.sender_cur_branch_id;
 

	
 
                        let new_branch_id = BranchId::new(new_branch_idx);
 
                        self.branches.push(new_branch);
 
                        Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id)
 
                    }
 

	
 
                    branch_idx = next_branch_idx;
 
                }
 
            }
 

	
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                // Retrieve first element in queue
 
@@ -483,13 +524,12 @@ impl ConnectorPDL {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling connector messages
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
 
        self.inbox.insert_message(target_port, message);
 
    }
 

	
 
    /// Accepts a synchronous message and combines it with the locally stored
 
    /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate.
 
@@ -816,12 +856,13 @@ impl ConnectorPDL {
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.halted_at_port = local_port_id;
 
                    let branch_id = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
src/runtime2/inbox.rs
Show inline comments
 
@@ -290,20 +290,20 @@ impl PrivateInbox {
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
 
    pub(crate) fn next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let (_, to_return) = &self.messages[self.len_read];
 
        let (target_port, message) = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
        return Some((target_port, message));
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub(crate) fn clear(&mut self) {
 
        self.messages.clear();
 
        self.len_read = 0;
src/runtime2/mod.rs
Show inline comments
 
@@ -21,13 +21,13 @@ use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::Port;
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::scheduler::SchedulerCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
@@ -228,18 +228,20 @@ impl RuntimeInner {
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
@@ -286,13 +288,13 @@ impl RuntimeInner {
 
            let created = lock.get_private(&key);
 

	
 
            match &created.connector {
 
                ConnectorVariant::UserDefined(connector) => {
 
                    for port_id in connector.ports.owned_ports.iter().copied() {
 
                        println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index);
 
                        let mut port = created_by.context.remove_port(port_id);
 
                        let port = created_by.context.remove_port(port_id);
 
                        created.context.add_port(port);
 
                    }
 
                },
 
                ConnectorVariant::Native(_) => unreachable!(),
 
            }
 
        }
 
@@ -347,13 +349,13 @@ impl RuntimeInner {
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst);
 
        println!("DEBUG: Decremented components to {}", old_num - 1);
 
        debug_assert!(old_num > 0);
 
        if old_num == 0 { // such that we have no more active connectors (for now!)
 
        if old_num == 1 { // such that we have no more active connectors (for now!)
 
            let num_interfaces = self.active_interfaces.load(Ordering::Acquire);
 
            if num_interfaces == 0 {
 
                self.signal_for_shutdown();
 
            }
 
        }
 
    }
 
@@ -425,13 +427,12 @@ impl ConnectorStore {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            context: ConnectorCtx::new(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
            pending_acks: 0,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::ScheduledConnector;
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 
@@ -74,56 +75,56 @@ impl Scheduler {
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            println!("DEBUG [{}]: Waiting for work", scheduler_id);
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                println!("DEBUG [{}]: ... No more work, quitting", scheduler_id);
 
                self.debug(" ... No more work, quitting");
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index);
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message);
 
                    self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
 
                        self.runtime.send_message(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 

	
 
                                // Note: for simplicity we program the scheduler to always finish
 
                                // running a connector with an empty outbox. If this ever changes
 
                                // then accepting the "port peer changed" message implies we need
 
                                // to change the recipient of the message in the outbox.
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                // Note: after this code has been reached, we may not have any
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                // TODO: Question from Max from the past: what the hell did you mean?
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
@@ -164,31 +165,32 @@ impl Scheduler {
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(
 
                        scheduler_ctx, &scheduled.context, &mut delta_state
 
                    );
 
                    println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 
                    self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
@@ -201,116 +203,121 @@ impl Scheduler {
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
 
                    scheduled.public.sleeping.store(true, Ordering::Release);
 

	
 
                    // We might have received a message in the meantime from a
 
                    // thread that did not see the sleeping flag set to `true`,
 
                    // so:
 
                    if !scheduled.public.inbox.is_empty() {
 
                        let should_reschedule_self = scheduled.public.sleeping
 
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.runtime.push_work(connector_key);
 
                        }
 
                    }
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.context.ports {
 
                        let message = scheduled.router.prepare_closing_channel(
 
                            port.self_id, port.peer_id,
 
                            connector_id
 
                        );
 
                        self.runtime.send_message(port.peer_connector, message);
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.runtime.send_message(port.peer_connector, message);
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
 
    fn handle_delta_state(&mut self,
 
        cur_connector: &mut ScheduledConnector, connector_id: ConnectorId,
 
        delta_state: &mut RunDeltaState
 
    ) {
 
        // Handling any messages that were sent
 
        let connector_id = connector_key.downcast();
 

	
 
        if !delta_state.outbox.is_empty() {
 
            for mut message in delta_state.outbox.drain(..) {
 
                // Based on the message contents, decide where the message
 
                // should be sent to. This might end up modifying the message.
 
                let (peer_connector, peer_port) = match &mut message {
 
                self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message));
 
                let (peer_connector, self_port, peer_port) = match &mut message {
 
                    MessageContents::Data(contents) => {
 
                        let port = context.get_port(contents.sending_port);
 
                        (port.peer_connector, port.peer_id)
 
                        let port = cur_connector.context.get_port(contents.sending_port);
 
                        (port.peer_connector, contents.sending_port, port.peer_id)
 
                    },
 
                    MessageContents::Sync(contents) => {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::RequestCommit(contents)=> {
 
                        let connector = contents.to_visit.pop().unwrap();
 
                        (connector, PortIdLocal::new_invalid())
 
                        (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::ConfirmCommit(contents) => {
 
                        for to_visit in &contents.to_visit {
 
                            let message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.runtime.send_message(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
                    MessageContents::Control(_) | MessageContents::Ping => {
 
                        // Never generated by the user's code
 
                        unreachable!();
 
                    }
 
                };
 

	
 
                // TODO: Maybe clean this up, perhaps special case for
 
                //  ConfirmCommit can be handled differently.
 
                if peer_connector.is_valid() {
 
                    if peer_port.is_valid() {
 
                        // Sending a message to a port, so the port may not be
 
                        // closed.
 
                        let port = cur_connector.context.get_port(self_port);
 
                        match port.state {
 
                            PortState::Open => {},
 
                            PortState::Closed => {
 
                                todo!("Handling sending over a closed port");
 
                            }
 
                        }
 
                    }
 
                    let message = Message {
 
                        sending_connector: connector_id,
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.runtime.send_message(peer_connector, message);
 
                }
 
            }
 
        }
 

	
 
        if !delta_state.new_ports.is_empty() {
 
            for port in delta_state.new_ports.drain(..) {
 
                context.ports.push(port);
 
                cur_connector.context.ports.push(port);
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.runtime.get_component_private(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.runtime.create_pdl_component(cur_connector, new_connector);
 
                let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                // Call above changed ownership of ports, but we still have to
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    cur_connector.pending_acks += 1;
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
@@ -322,12 +329,43 @@ impl Scheduler {
 
        }
 

	
 
        debug_assert!(delta_state.outbox.is_empty());
 
        debug_assert!(delta_state.new_ports.is_empty());
 
        debug_assert!(delta_state.new_connectors.is_empty());
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.context.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But do to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
@@ -366,13 +404,13 @@ impl ControlMessageHandler {
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: connectorId
 
        self_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
 
@@ -422,13 +460,13 @@ impl ControlMessageHandler {
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = entry {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.source_connector == sending_connector &&
 
                    entry.target_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
 
            }
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -11,13 +11,13 @@ fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 

	
 
    return runtime;
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(1, "
 
    let rt = runtime_for(4, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"putting!\");
 
            put(sender, true);
 
@@ -30,21 +30,22 @@ primitive getter(in<bool> receiver, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"getting!\");
 
            auto result = get(receiver);
 
            assert(result);
 

	
 
        }
 
        index += 1;
 
    }
 
}
 
    ");
 

	
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 5;
 
    let num_loops = 100;
 

	
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 

	
0 comments (0 inline, 0 general)