Changeset - 32d9f23a4c87
[Not reviewed]
0 3 0
MH - 4 years ago 2021-11-12 13:37:37
contact@maxhenger.nl
fix clearing too many messages
3 files changed with 47 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
// connector.rs
 
//
 
// Represents a component. A component (and the scheduler that is running it)
 
// has many properties that are not easy to subdivide into aspects that are
 
// conceptually handled by particular data structures. That is to say: the code
 
// that we run governs: running PDL code, keeping track of ports, instantiating
 
// new components and transports (i.e. interacting with the runtime), running
 
// a consensus algorithm, etc. But on the other hand, our data is rather
 
// simple: we have a speculative execution tree, a set of ports that we own,
 
// and a bit of code that we should run.
 
//
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::PortId;
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
#[derive(Debug, Eq, PartialEq)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
    Exit,           // Connector has exited
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.registered_id.is_some();
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(data) => Some(data.clone()),
 
            None => None,
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        return self.prepared_channel.take();
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
            // Run in sync mode
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
            // Handle any new finished branches
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 

	
 
                if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Actually found a solution
 
                    self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 

	
 
                self.last_finished_handled = Some(branch_id);
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: ComponentState) -> Self {
 
        Self{
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
 
                Message::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
src/runtime2/scheduler.rs
Show inline comments
 
@@ -107,469 +107,470 @@ impl Scheduler {
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::Ack => {
 
                            scheduled.router.handle_ack(message.id);
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::Sync(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                },
 
                Message::Control(_) => {
 
                    unreachable!("component sending control messages directly");
 
                }
 
            };
 

	
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx.inbox_messages.len() {
 
                            let message = &scheduled.ctx.inbox_messages[message_idx];
 
                            if Self::get_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let message = scheduled.ctx.inbox_messages.remove(message_idx);
 
                                new_connector.ctx.inbox_messages.push(message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
                        }
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
                            port.self_id, port.peer_id, scheduled.ctx.id,
 
                            port.peer_connector, new_connector.ctx.id
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                        self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx.inbox_messages.clear();
 
                // Just left sync region. So clear inbox up until the last
 
                // message that was read.
 
                scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read);
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn get_message_target_port(message: &Message) -> Option<PortIdLocal> {
 
        match message {
 
            Message::Data(data) => return Some(data.data_header.target_port),
 
            Message::Sync(_) => {},
 
            Message::Control(control) => {
 
                match &control.content {
 
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
 
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
 
                    ControlContent::Ping | ControlContent::Ack => {},
 
                }
 
            },
 
        }
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<Message>, // never control or ping messages
 
    inbox_messages: Vec<Message>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox_messages: Vec::new(),
 
            inbox_len_read: 0,
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) {
 
        debug_assert!(self.is_in_sync);
 
        self.outbox.push_back(contents);
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter {
 
            messages: &self.inbox_messages,
 
            next_index: 0,
 
            max_index: self.inbox_len_read,
 
            match_port_id
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message from the inbox `None` if there are no
 
    /// (new) messages to read.
 
    // TODO: Fix the clone of the data message, entirely unnecessary
 
    pub(crate) fn read_next_message(&mut self) -> Option<Message> {
 
        if !self.is_in_sync { return None; }
 
        if self.inbox_len_read == self.inbox_messages.len() { return None; }
 

	
 
        // We want to keep data messages in the inbox, because we need to check
 
        // them in the future. We don't want to keep sync messages around, we
 
        // should only handle them once. Control messages should never be in
 
        // here.
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        match message {
 
            Message::Data(content) => {
 
                self.inbox_len_read += 1;
 
                return Some(Message::Data(content.clone()));
 
            },
 
            Message::Sync(_) => {
 
                let message = self.inbox_messages.remove(self.inbox_len_read);
 
                return Some(message);
 
            },
 
            Message::Control(_) => unreachable!("control message ended up in component inbox"),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
}
 

	
 
impl<'a> Iterator for MessagesIter<'a> {
 
    type Item = &'a DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if let Message::Data(message) = &message {
 
                if message.data_header.target_port == self.match_port_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(message);
 
                }
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
src/runtime2/tests/api_component.rs
Show inline comments
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 

	
 
#[test]
 
fn test_getting_from_component() {
 
    const CODE: &'static str ="
 
    primitive loop_sender(out<u32> numbers, u32 cur, u32 last) {
 
        while (cur < last) {
 
            print(\"sync start\");
 
            synchronous {
 
                print(\"sending\");
 
                put(numbers, cur);
 
                cur += 1;
 
            }
 
            print(\"sync solution!\");
 
        }
 
    }";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let channel = api.create_channel().unwrap();
 
    api.create_connector("", "loop_sender", ValueGroup::new_stack(vec![
 
        Value::Output(PortId::new(channel.putter_id.index)),
 
        Value::UInt32(1337),
 
        Value::UInt32(1337 + NUM_LOOPS)
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Get(channel.getter_id),
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 

	
 
        assert!(result.len() == 1 && result[0].values.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, 1337 + loop_idx);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)