Changeset - 83d0cd8db5fb
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-25 20:24:24
contact@maxhenger.nl
WIP on more bugfixing
4 files changed with 151 insertions and 109 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -125,222 +125,223 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
                // Handle any new finished branches
 
                let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
                while let Some(branch_id) = iter_id {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 
                    if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                        return self.enter_non_sync_mode(round_conclusion, comp_ctx);
 
                    }
 

	
 
                    self.last_finished_handled = Some(branch_id);
 
                }
 

	
 
                return scheduling;
 
            },
 
            Mode::NonSync => {
 
                let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::SyncError => {
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::Error => {
 
                // This shouldn't really be called. Because when we reach exit
 
                // mode the scheduler should not run the component anymore
 
                unreachable!("called component run() during error-mode");
 
            },
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: Prompt) -> Self {
 
        Self{
 
            mode: Mode::NonSync,
 
            eval_error: None,
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        while let Some(ticket) = ctx.get_next_message_ticket() {
 
            let message = ctx.read_message_using_ticket(ticket);
 
            let immediate_result = if let Message::Data(_) = message {
 
                self.handle_new_data_message(ticket, ctx);
 
                None
 
            } else {
 
                match ctx.take_message_using_ticket(ticket) {
 
                    Message::Data(_) => unreachable!(),
 
                    Message::SyncComp(message) => {
 
                        self.handle_new_sync_comp_message(message, ctx)
 
                    },
 
                    Message::SyncPort(message) => {
 
                        self.handle_new_sync_port_message(message, ctx);
 
                        None
 
                    },
 
                    Message::SyncControl(message) => {
 
                        self.handle_new_sync_control_message(message, ctx)
 
                    },
 
                    Message::Control(_) => unreachable!("control message in component"),
 
                }
 
            };
 

	
 
            if let Some(result) = immediate_result {
 
                return Some(result);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if self.consensus.handle_new_data_message(ticket, ctx) {
 
        if !self.consensus.handle_new_data_message(ticket, ctx) {
 
            // Message should not be handled now
 
            return;
 
        }
 

	
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        println!("DEBUG: Actually really handling {:?}", message);
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 

	
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            self.eval_error = Some(eval_error);
 
            self.mode = Mode::SyncError;
 
            if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) {
 
                // We can exit immediately
 
                return self.enter_non_sync_mode(conclusion, comp_ctx);
 
            } else {
 
                // Current branch failed. But we may have other things that are
 
                // running.
 
                return ConnectorScheduling::Immediate;
 
            }
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            EvalContinuation::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            EvalContinuation::BlockFires(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::BlockGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal, PortState};
 
use super::inbox::{
 
    Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker,
 
    DataMessage,
 
    SyncCompMessage, SyncCompContent,
 
    SyncPortMessage, SyncPortContent,
 
    SyncControlMessage, SyncControlContent
 
};
 
use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID
 
//  but we do want to enumerate all current ports. So put that somewhere in a
 
//  central place. Secondly. Error handling and regular message handling is
 
//  becoming a mess.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    handled_wave: bool, // encountered notification wave in this round
 
    conclusion: Option<RoundConclusion>,
 
    ack_remaining: u32,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum MessageOrigin {
 
    Past,
 
    Present,
 
    Future
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            handled_wave: false,
 
            conclusion: None,
 
            ack_remaining: 0,
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    #[deprecated]
 
    pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            channel_mapping: ctx.get_ports().iter()
 
                .map(|v| ChannelAnnotation {
 
                    channel_id: v.channel_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            channel_mapping: parent_branch_annotations.channel_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            println!("DEBUG: Failure everything silent");
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // We're not in the trivial case: since we've communicated we need to
 
        // let everyone know that this round is probably not going to end well.
 
        return self.initiate_sync_failure(ctx);
 
@@ -214,192 +214,193 @@ impl Consensus {
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                let message = SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                };
 
                match ctx.submit_message(Message::SyncPort(message)) {
 
                    Ok(_) => {
 
                        self.encountered_ports.push(self_port_id);
 
                    },
 
                    Err(_) => {
 
                        // Seems like we were done with this branch, but one of
 
                        // the silent ports (in scope) is actually closed
 
                        return self.notify_of_fatal_branch(branch_id, ctx);
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        println!("DEBUG: ***** Incrementing sync round stuff");
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        let data_header = DataHeader{
 
            expected_mapping: branch.channel_mapping.iter()
 
                .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id)
 
                .copied()
 
                .collect(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool {
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let target_port = message.data_header.target_port;
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return false,
 
            MessageOrigin::Present => {
 
                self.encountered_ports.push(target_port);
 
                return true;
 
            },
 
            MessageOrigin::Future => {
 
                let message = ctx.take_message_using_ticket(ticket);
 
                ctx.put_back_message(message);
 
                return false;
 
            }
 
        }
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
@@ -500,192 +501,193 @@ impl Consensus {
 
                // And let the leader know about our port state
 
                let annotations = &self.branch_annotations[0];
 
                let mut channels = Vec::with_capacity(annotations.channel_mapping.len());
 
                for mapping in &annotations.channel_mapping {
 
                    let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
                    channels.push(LocalChannelPresence{
 
                        channel_id: mapping.channel_id,
 
                        is_closed: port_info.state == PortState::Closed,
 
                    });
 
                }
 

	
 
                let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
                    component_id: ctx.id,
 
                    channels,
 
                }), ctx);
 
                return maybe_conclusion;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if message.in_response_to_sync_round < self.sync_round {
 
            // Old message
 
            return None
 
        }
 

	
 
        // Because the message is always sent in response to a message
 
        // originating here, the sync round number can never be larger than the
 
        // currently stored one.
 
        debug_assert_eq!(message.in_response_to_sync_round, self.sync_round);
 
        match message.content {
 
            SyncControlContent::ChannelIsClosed(_) => {
 
                return self.initiate_sync_failure(ctx);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap();
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == target_port.channel_id {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(&message.content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.channel_mapping {
 
                if expected.channel_id == current.channel_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        let origin = self.handle_peer(&sync_header);
 
        println!(" ********************** GOT {:?}", origin);
 
        if origin != MessageOrigin::Present {
 
            // We do not have to handle it now
 
            return origin;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncCompMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_data_to_new_leader(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        } // else: exactly equal, so do nothing
 

	
 
        return MessageOrigin::Present;
 
    }
 

	
 
    /// Handles a (potentially new) peer. Returns `false` if the provided sync
 
    /// number is different then the expected one.
 
    fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin {
 
        let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id);
 
        match position {
 
            Some(index) => {
 
                let entry = &mut self.peers[index];
 
                if entry.encountered_this_round {
 
                    // Already encountered this round
 
                    if sync_header.sync_round < entry.expected_sync_round {
 
                        return MessageOrigin::Past;
 
                    } else if sync_header.sync_round == entry.expected_sync_round {
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Future;
 
                    }
 
                } else {
 
                    // TODO: Proper handling of potential overflow
 
                    entry.encountered_this_round = true;
 

	
 
                    if sync_header.sync_round >= entry.expected_sync_round {
 
                        entry.expected_sync_round = sync_header.sync_round;
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Past;
 
                    }
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return MessageOrigin::Present;
 
            }
 
        }
 
    }
 

	
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::mem::MaybeUninit;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::collections::RawVec;
 
use crate::protocol::eval::EvalError;
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::branch::{BranchId};
 
use super::connector::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox::{
 
    Message, DataMessage, SyncHeader,
 
    ControlMessage, ControlContent,
 
    SyncControlMessage, SyncControlContent,
 
};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                self.debug(" ... No more work, quitting");
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    self.handle_inbox_while_shutting_down(scheduled);
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::Ack => {
 
                            if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) {
 
                                self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message));
 
                                self.runtime.send_message(target_component, new_control_message);
 
                            };
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.peer_connector = new_target_connector_id;
 

	
 
                        // Note: for simplicity we program the scheduler to always finish
 
                        // running a connector with an empty outbox. If this ever changes
 
                        // then accepting the "port peer changed" message implies we need
 
                        // to change the recipient of the message in the outbox.
 
                        debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                        // And respond with an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::CloseChannel(port_id) => {
 
                        // Mark the port as being closed
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.state = PortState::Closed;
 

	
 
                        // Send an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) {
 
                            self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message));
 
                            self.runtime.send_message(target_component, new_control_message);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles inbox messages while shutting down. This intends to handle the
 
    /// case where a component cleanly exited outside of a sync region, but a
 
    /// peer, before receiving the `CloseChannel` message, sent a message inside
 
    /// a sync region. This peer should be notified that its message is not
 
    /// received by a component in a sync region.
 
    fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) {
 
        // Note: we're not handling the public inbox, we're dealing with the
 
        // private one!
 
        debug_assert!(scheduled.shutting_down);
 

	
 
        while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
            let message = scheduled.ctx.read_message_using_ticket(ticket);
 
            let target_port_and_round_number = match message {
 
                Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
                Message::SyncComp(_) => None,
 
                Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
                Message::SyncControl(_) => None,
 
                Message::Control(_) => None,
 
            };
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
            if let Some((target_port, sync_round)) = target_port_and_round_number {
 
                // This message is aimed at a port, but we're shutting down, so
 
                // notify the peer that its was not received properly.
 
                // (also: since we're shutting down, we're not in sync mode and
 
                // the context contains the definitive set of owned ports)
 
                let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
                let message = SyncControlMessage{
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message));
 
                self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
            }
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            let message = SyncControlMessage{
 
                in_response_to_sync_round: sync_round,
 
                target_component_id: port.peer_connector,
 
                content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
            };
 
            self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message));
 
            self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"),
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox);
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed, but only
 
                        // if the port wasn't already closed (otherwise the peer
 
                        // is gone).
 
                        if port.state == PortState::Open {
 
                            let reroute_message = scheduled.router.prepare_reroute(
 
                                port.self_id, port.peer_id, scheduled.ctx.id,
 
                                port.peer_connector, new_connector.ctx.id,
 
                                &mut new_connector.router
 
                            );
 

	
 
                            self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                        }
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So prepare inbox for the next sync
 
                // round
 
                scheduled.ctx.inbox.prepare_for_next_round();
 
                scheduled.ctx.inbox.clear_read_messages();
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox: Inbox,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox: Inbox::new(),
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
@@ -524,349 +521,391 @@ impl ComponentCtx {
 
        return Ok(());
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return self.inbox.get_read_data_messages(match_port_id);
 
    }
 

	
 
    pub(crate) fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if !self.is_in_sync { return None; }
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option<MessageTicket> {
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        return self.inbox.read_message_using_ticket(ticket);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        return self.inbox.take_message_using_ticket(ticket)
 
    }
 

	
 
    /// Puts back a message back into the inbox. The reason being that the
 
    /// message is actually part of the next sync round. This will
 
    pub(crate) fn put_back_message(&mut self, message: Message) {
 
        self.inbox.put_back_message(message);
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
}
 

	
 
impl<'a> Iterator for MessagesIter<'a> {
 
    type Item = &'a DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if let Message::Data(message) = &message {
 
                if message.data_header.target_port == self.match_port_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(message);
 
                }
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Private Inbox
 
// -----------------------------------------------------------------------------
 

	
 
/// A structure that contains inbox messages. Some messages are left inside and
 
/// continuously re-read. Others are taken out, but may potentially be put back
 
/// for later reading. Later reading in this case implies that they are put back
 
/// for reading in the next sync round.
 
struct Inbox {
 
    temp_m: Vec<Message>,
 
    temp_d: Vec<Message>,
 
    messages: RawVec<Message>,
 
    next_delay_idx: u32,
 
    start_read_idx: u32,
 
    next_read_idx: u32,
 
    generation: u32,
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub(crate) struct MessageTicket {
 
    index: u32,
 
    generation: u32,
 
}
 

	
 
impl Inbox {
 
    fn new() -> Self {
 
        return Inbox {
 
            temp_m: Vec::new(), temp_d: Vec::new(),
 
            messages: RawVec::new(),
 
            next_delay_idx: 0,
 
            start_read_idx: 0,
 
            next_read_idx: 0,
 
            generation: 0,
 
        }
 
    }
 

	
 
    fn insert_new(&mut self, message: Message) {
 
        assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size
 
        self.temp_m.push(message);
 
        return;
 
        self.messages.push(message);
 
    }
 

	
 
    fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if self.next_read_idx as usize >= self.temp_m.len() { return None };
 
        let idx = self.next_read_idx;
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{ index: idx, generation: self.generation });
 
        let cur_read_idx = self.next_read_idx as usize;
 
        if cur_read_idx >= self.messages.len() {
 
            return None;
 
        }
 

	
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{
 
            index: cur_read_idx as u32,
 
            generation: self.generation
 
        });
 
    }
 

	
 
    fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        return &self.temp_m[ticket.index as usize];
 
        return unsafe{ &*self.messages.get(ticket.index as usize) }
 
    }
 

	
 
    fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        debug_assert!(ticket.index < self.next_read_idx);
 
        self.next_read_idx -= 1;
 
        return self.temp_m.remove(ticket.index as usize);
 
        unsafe {
 
            let take_idx = ticket.index as usize;
 
            let val = std::ptr::read(self.messages.get(take_idx));
 

	
 
            // Move messages to the right, clearing up space in the
 
            // front.
 
            let num_move_right = take_idx - self.start_read_idx as usize;
 
            self.messages.move_range(
 
                self.start_read_idx as usize,
 
                self.start_read_idx as usize + 1,
 
                num_move_right
 
            );
 

	
 
            self.start_read_idx += 1;
 

	
 
            return val;
 
        }
 
    }
 

	
 
    fn put_back_message(&mut self, message: Message) {
 
        // We have space in front of the array because we've taken out a message
 
        // before.
 
        self.temp_d.push(message);
 
        return;
 
        debug_assert!(self.next_delay_idx < self.start_read_idx);
 
        unsafe {
 
            // Write to front of the array
 
            std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message);
 
            self.next_delay_idx += 1;
 
        }
 
    }
 

	
 
    fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter{
 
            messages: self.temp_m.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
        return MessagesIter{
 
            messages: self.messages.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
    }
 

	
 
    fn prepare_for_next_round(&mut self) {
 
    fn clear_read_messages(&mut self) {
 
        self.temp_m.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.temp_d.drain(..).enumerate() {
 
            self.temp_m.insert(idx, v);
 
        }
 
        self.next_read_idx = 0;
 
        return;
 
        // Deallocate everything that was read
 
        self.destroy_range(self.start_read_idx, self.next_read_idx);
 
        self.generation += 1;
 

	
 
        // Join up all remaining values with the delayed ones in the front
 
        let num_to_move = self.messages.len() - self.next_read_idx as usize;
 
        self.messages.move_range(
 
            self.next_read_idx as usize,
 
            self.next_delay_idx as usize,
 
            num_to_move
 
        );
 

	
 
        // Set all indices (and the RawVec len) to make sense in this new state
 
        let new_len = self.next_delay_idx as usize + num_to_move;
 
        self.next_delay_idx = 0;
 
        self.start_read_idx = 0;
 
        self.next_read_idx = 0;
 
        self.messages.len = new_len;
 
    }
 

	
 
    fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) {
 
        // Convoluted assert to make sure we're in non-sync mode, as that is
 
        // when this is called, and that makes our lives easier
 
        debug_assert!(self.temp_d.is_empty());
 
        let mut idx = 0;
 
        while idx < self.temp_m.len() {
 
            let msg = &self.temp_m[idx];
 
            if let Some(target) = msg.target_port() {
 
                if target == port {
 
                    new_inbox.temp_m.push(self.temp_m.remove(idx));
 
                    continue;
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
        return;
 

	
 
        let mut idx = 0;
 
        while idx < self.messages.len() {
 
            let message = unsafe{ &*self.messages.get(idx) };
 
            if let Some(target_port) = message.target_port() {
 
                if target_port == port {
 
                    // Transfer port
 
                    unsafe {
 
                        let message = std::ptr::read(message as *const _);
 
                        let remaining = self.messages.len() - idx;
 
                        if remaining > 1 {
 
                            self.messages.move_range(idx + 1, idx, remaining - 1);
 
                        let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition
 
                        if remaining > 0 {
 
                            self.messages.move_range(idx + 1, idx, remaining);
 
                        }
 
                        self.messages.len -= 1;
 
                        new_inbox.insert_new(message);
 
                    }
 
                } else {
 
                    // Do not transfer port
 
                    idx += 1;
 

	
 
                    continue; // do not increment index
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn destroy_range(&mut self, start_idx: u32, end_idx: u32) {
 
        for idx in (start_idx as usize)..(end_idx as usize) {
 
            unsafe {
 
                let msg = self.messages.get_mut(idx);
 
                std::ptr::drop_in_place(msg);
 
            }
 
        }
 
    }
 
}
 

	
 
impl Drop for Inbox {
 
    fn drop(&mut self) {
 
        // Whether in sync or not in sync. We have two ranges of allocated
 
        // messages:
 
        // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
        // - readable messages: from `start_read_idx` to `messages.len`
 
        self.destroy_range(0, self.next_delay_idx);
 
        self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
    }
 
}
 
//
 
// impl Drop for Inbox {
 
//     fn drop(&mut self) {
 
//         // Whether in sync or not in sync. We have two ranges of allocated
 
//         // messages:
 
//         // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
//         // - readable messages: from `start_read_idx` to `messages.len`
 
//         self.destroy_range(0, self.next_delay_idx);
 
//         self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
//     }
 
// }
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
    ReroutePending,
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
    id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId
 
    ) -> ControlMessage {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
 
                source_port: self_port_id,
 
                target_port: peer_port_id,
 
            }),
 
        });
 

	
 
        return ControlMessage {
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::CloseChannel(peer_port_id),
 
        };
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler,
 
    ) -> ControlMessage {
 
        let id = self.take_id();
 

	
 
        let new_owner_id = new_owner_ctrl_handler.take_id();
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                target_port: port_id,
 
                source_connector: peer_connector_id,
 
                target_connector: new_owner_connector_id,
 
                id_of_ack_after_confirmation: new_owner_id,
 
            }),
 
        });
 

	
 
        new_owner_ctrl_handler.active.push(ControlEntry{
 
            id: new_owner_id,
 
            variant: ControlVariant::ReroutePending,
 
        });
 

	
 
        return ControlMessage {
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id),
 
        };
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 4;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 3;
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 2;
 
pub(crate) const NUM_LOOPS: u32 = 15;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
0 comments (0 inline, 0 general)