Changeset - 83d0cd8db5fb
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-25 20:24:24
contact@maxhenger.nl
WIP on more bugfixing
4 files changed with 88 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -209,25 +209,25 @@ impl ConnectorPDL {
 

	
 
            if let Some(result) = immediate_result {
 
                return Some(result);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if self.consensus.handle_new_data_message(ticket, ctx) {
 
        if !self.consensus.handle_new_data_message(ticket, ctx) {
 
            // Message should not be handled now
 
            return;
 
        }
 

	
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 
@@ -239,24 +239,25 @@ impl ConnectorPDL {
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        println!("DEBUG: Actually really handling {:?}", message);
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
src/runtime2/consensus.rs
Show inline comments
 
@@ -79,25 +79,25 @@ pub(crate) struct Consensus {
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum MessageOrigin {
 
    Past,
 
    Present,
 
    Future
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
@@ -298,24 +298,25 @@ impl Consensus {
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        println!("DEBUG: ***** Incrementing sync round stuff");
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
@@ -584,24 +585,25 @@ impl Consensus {
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        let origin = self.handle_peer(&sync_header);
 
        println!(" ********************** GOT {:?}", origin);
 
        if origin != MessageOrigin::Present {
 
            // We do not have to handle it now
 
            return origin;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
src/runtime2/scheduler.rs
Show inline comments
 
@@ -56,25 +56,24 @@ impl Scheduler {
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    self.handle_inbox_while_shutting_down(scheduled);
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
@@ -109,24 +108,31 @@ impl Scheduler {
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
@@ -140,27 +146,25 @@ impl Scheduler {
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.peer_connector = new_target_connector_id;
 

	
 
                        // Note: for simplicity we program the scheduler to always finish
 
                        // running a connector with an empty outbox. If this ever changes
 
                        // then accepting the "port peer changed" message implies we need
 
                        // to change the recipient of the message in the outbox.
 
                        debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
@@ -186,69 +190,62 @@ impl Scheduler {
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) {
 
                            self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message));
 
                            self.runtime.send_message(target_component, new_control_message);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles inbox messages while shutting down. This intends to handle the
 
    /// case where a component cleanly exited outside of a sync region, but a
 
    /// peer, before receiving the `CloseChannel` message, sent a message inside
 
    /// a sync region. This peer should be notified that its message is not
 
    /// received by a component in a sync region.
 
    fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) {
 
        // Note: we're not handling the public inbox, we're dealing with the
 
        // private one!
 
        debug_assert!(scheduled.shutting_down);
 

	
 
        while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
            let message = scheduled.ctx.read_message_using_ticket(ticket);
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            let message = SyncControlMessage{
 
                in_response_to_sync_round: sync_round,
 
                target_component_id: port.peer_connector,
 
                content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
            };
 
            self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message));
 
            self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
        }
 
    }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message));
 

	
 
@@ -342,25 +339,25 @@ impl Scheduler {
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So prepare inbox for the next sync
 
                // round
 
                scheduled.ctx.inbox.prepare_for_next_round();
 
                scheduled.ctx.inbox.clear_read_messages();
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
@@ -608,181 +605,223 @@ impl<'a> Iterator for MessagesIter<'a> {
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Private Inbox
 
// -----------------------------------------------------------------------------
 

	
 
/// A structure that contains inbox messages. Some messages are left inside and
 
/// continuously re-read. Others are taken out, but may potentially be put back
 
/// for later reading. Later reading in this case implies that they are put back
 
/// for reading in the next sync round.
 
struct Inbox {
 
    temp_m: Vec<Message>,
 
    temp_d: Vec<Message>,
 
    messages: RawVec<Message>,
 
    next_delay_idx: u32,
 
    start_read_idx: u32,
 
    next_read_idx: u32,
 
    generation: u32,
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub(crate) struct MessageTicket {
 
    index: u32,
 
    generation: u32,
 
}
 

	
 
impl Inbox {
 
    fn new() -> Self {
 
        return Inbox {
 
            temp_m: Vec::new(), temp_d: Vec::new(),
 
            messages: RawVec::new(),
 
            next_delay_idx: 0,
 
            start_read_idx: 0,
 
            next_read_idx: 0,
 
            generation: 0,
 
        }
 
    }
 

	
 
    fn insert_new(&mut self, message: Message) {
 
        assert!(self.messages.len() < u32::MAX as usize); // TODO: @Size
 
        self.temp_m.push(message);
 
        return;
 
        self.messages.push(message);
 
    }
 

	
 
    fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if self.next_read_idx as usize >= self.temp_m.len() { return None };
 
        let idx = self.next_read_idx;
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{ index: idx, generation: self.generation });
 
        let cur_read_idx = self.next_read_idx as usize;
 
        if cur_read_idx >= self.messages.len() {
 
            return None;
 
        }
 

	
 
        self.generation += 1;
 
        self.next_read_idx += 1;
 
        return Some(MessageTicket{
 
            index: cur_read_idx as u32,
 
            generation: self.generation
 
        });
 
    }
 

	
 
    fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        return &self.temp_m[ticket.index as usize];
 
        return unsafe{ &*self.messages.get(ticket.index as usize) }
 
    }
 

	
 
    fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        debug_assert_eq!(self.generation, ticket.generation);
 
        debug_assert!(ticket.index < self.next_read_idx);
 
        self.next_read_idx -= 1;
 
        return self.temp_m.remove(ticket.index as usize);
 
        unsafe {
 
            let take_idx = ticket.index as usize;
 
            let val = std::ptr::read(self.messages.get(take_idx));
 

	
 
            // Move messages to the right, clearing up space in the
 
            // front.
 
            let num_move_right = take_idx - self.start_read_idx as usize;
 
            self.messages.move_range(
 
                self.start_read_idx as usize,
 
                self.start_read_idx as usize + 1,
 
                num_move_right
 
            );
 

	
 
            self.start_read_idx += 1;
 

	
 
            return val;
 
        }
 
    }
 

	
 
    fn put_back_message(&mut self, message: Message) {
 
        // We have space in front of the array because we've taken out a message
 
        // before.
 
        self.temp_d.push(message);
 
        return;
 
        debug_assert!(self.next_delay_idx < self.start_read_idx);
 
        unsafe {
 
            // Write to front of the array
 
            std::ptr::write(self.messages.get_mut(self.next_delay_idx as usize), message);
 
            self.next_delay_idx += 1;
 
        }
 
    }
 

	
 
    fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter{
 
            messages: self.temp_m.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
        return MessagesIter{
 
            messages: self.messages.as_slice(),
 
            next_index: self.start_read_idx as usize,
 
            max_index: self.next_read_idx as usize,
 
            match_port_id
 
        };
 
    }
 

	
 
    fn prepare_for_next_round(&mut self) {
 
    fn clear_read_messages(&mut self) {
 
        self.temp_m.drain(0..self.next_read_idx as usize);
 
        for (idx, v) in self.temp_d.drain(..).enumerate() {
 
            self.temp_m.insert(idx, v);
 
        }
 
        self.next_read_idx = 0;
 
        return;
 
        // Deallocate everything that was read
 
        self.destroy_range(self.start_read_idx, self.next_read_idx);
 
        self.generation += 1;
 

	
 
        // Join up all remaining values with the delayed ones in the front
 
        let num_to_move = self.messages.len() - self.next_read_idx as usize;
 
        self.messages.move_range(
 
            self.next_read_idx as usize,
 
            self.next_delay_idx as usize,
 
            num_to_move
 
        );
 

	
 
        // Set all indices (and the RawVec len) to make sense in this new state
 
        let new_len = self.next_delay_idx as usize + num_to_move;
 
        self.next_delay_idx = 0;
 
        self.start_read_idx = 0;
 
        self.next_read_idx = 0;
 
        self.messages.len = new_len;
 
    }
 

	
 
    fn transfer_messages_for_port(&mut self, port: PortIdLocal, new_inbox: &mut Inbox) {
 
        // Convoluted assert to make sure we're in non-sync mode, as that is
 
        // when this is called, and that makes our lives easier
 
        debug_assert!(self.temp_d.is_empty());
 
        let mut idx = 0;
 
        while idx < self.temp_m.len() {
 
            let msg = &self.temp_m[idx];
 
            if let Some(target) = msg.target_port() {
 
                if target == port {
 
                    new_inbox.temp_m.push(self.temp_m.remove(idx));
 
                    continue;
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
        return;
 

	
 
        let mut idx = 0;
 
        while idx < self.messages.len() {
 
            let message = unsafe{ &*self.messages.get(idx) };
 
            if let Some(target_port) = message.target_port() {
 
                if target_port == port {
 
                    // Transfer port
 
                    unsafe {
 
                        let message = std::ptr::read(message as *const _);
 
                        let remaining = self.messages.len() - idx;
 
                        if remaining > 1 {
 
                            self.messages.move_range(idx + 1, idx, remaining - 1);
 
                        let remaining = self.messages.len() - idx - 1; // idx < len, due to loop condition
 
                        if remaining > 0 {
 
                            self.messages.move_range(idx + 1, idx, remaining);
 
                        }
 
                        self.messages.len -= 1;
 
                        new_inbox.insert_new(message);
 
                    }
 
                } else {
 
                    // Do not transfer port
 
                    idx += 1;
 

	
 
                    continue; // do not increment index
 
                }
 
            }
 

	
 
            idx += 1;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn destroy_range(&mut self, start_idx: u32, end_idx: u32) {
 
        for idx in (start_idx as usize)..(end_idx as usize) {
 
            unsafe {
 
                let msg = self.messages.get_mut(idx);
 
                std::ptr::drop_in_place(msg);
 
            }
 
        }
 
    }
 
}
 

	
 
impl Drop for Inbox {
 
    fn drop(&mut self) {
 
        // Whether in sync or not in sync. We have two ranges of allocated
 
        // messages:
 
        // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
        // - readable messages: from `start_read_idx` to `messages.len`
 
        self.destroy_range(0, self.next_delay_idx);
 
        self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
    }
 
}
 
//
 
// impl Drop for Inbox {
 
//     fn drop(&mut self) {
 
//         // Whether in sync or not in sync. We have two ranges of allocated
 
//         // messages:
 
//         // - delayed messages: from 0 to `next_delay_idx` (which is 0 if in non-sync)
 
//         // - readable messages: from `start_read_idx` to `messages.len`
 
//         self.destroy_range(0, self.next_delay_idx);
 
//         self.destroy_range(self.start_read_idx, self.messages.len as u32);
 
//     }
 
// }
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -6,27 +6,27 @@ mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 4;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 3;
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_INSTANCES: u32 = 2;
 
pub(crate) const NUM_LOOPS: u32 = 15;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
0 comments (0 inline, 0 general)