Changeset - ceaa946df1eb
[Not reviewed]
0 4 0
MH - 4 years ago 2021-10-25 19:44:46
contact@maxhenger.nl
WIP on fixing reroute bug
4 files changed with 5 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -188,193 +188,192 @@ impl ConnectorPorts {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            let parent_port = parent_port.clone();
 
            self.port_mapping.push(parent_port);
 
        }
 
    }
 

	
 
    /// Adds a new port. Caller must make sure that the connector is not in the
 
    /// sync phase.
 
    fn add_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len());
 
        debug_assert!(!self.owned_ports.contains(&port_id));
 
        self.owned_ports.push(port_id);
 
        self.port_mapping.push(PortAssignment::new_unassigned());
 
    }
 

	
 
    /// Commits to a particular branch. Essentially just removes the port
 
    /// mapping information generated during the sync phase.
 
    fn commit_to_sync(&mut self) {
 
        self.port_mapping.truncate(self.owned_ports.len());
 
        debug_assert!(self.port_mapping.iter().all(|v| {
 
            !v.is_assigned && !v.last_registered_branch_id.is_valid()
 
        }));
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if *port == port_id {
 
                return Some(idx)
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    /// Retrieves the ID associated with the port at the provided index
 
    #[inline]
 
    fn get_port_id(&self, port_index: usize) -> PortIdLocal {
 
        return self.owned_ports[port_index];
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        println!("port_idx = {}, branch_idx = {}, num_ports = {}, port_mapping.len() = {}", port_idx, branch_idx, num_ports, self.port_mapping.len());
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
impl BranchQueue {
 
    #[inline]
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    #[inline]
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 

	
 
    #[inline]
 
    fn clear(&mut self) {
 
        self.first = 0;
 
        self.last = 0;
 
    }
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32, // TODO: Change to BranchId?
 
    cur_round: u32,
 
    // Port/message management
 
    pub committed_to: Option<(ConnectorId, u64)>,
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
}
 

	
 
// TODO: Remove this monstrosity
 
struct ConnectorRunContext<'a> {
 
    branch_index: u32,
 
    ports: &'a ConnectorPorts,
 
    ports_delta: &'a Vec<PortOwnershipDelta>,
 
    received: &'a HashMap<PortIdLocal, DataMessage>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) {
 
            // Either acquired or released, must be silent
 
            return false;
 
        }
 

	
 
        let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap();
 
        let mapping = self.ports.get_port(self.branch_index, port_index);
 
        return mapping.is_assigned;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(message) => Some(message.message.clone()),
 
            None => None,
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
src/runtime2/native.rs
Show inline comments
 
@@ -40,182 +40,182 @@ pub struct ConnectorApplication {
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) {
 
        use MessageContents as MC;
 

	
 
        match message.contents {
 
            MC::Data(_) => unreachable!("data message in API connector"),
 
            MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => {
 
                // Handling sync in API
 
            },
 
            MC::Control(_) => {},
 
            MC::Ping => {},
 
        }
 
    }
 

	
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    println!("DEBUG: API adopting ports");
 
                    delta_state.new_ports.reserve(2);
 
                    delta_state.new_ports.push(endpoint_a);
 
                    delta_state.new_ports.push(endpoint_b);
 
                }
 
                ApplicationJob::NewConnector(connector) => {
 
                    println!("DEBUG: API creating connector");
 
                    delta_state.new_connectors.push(connector);
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push(putter_id);
 
        self.owned_ports.push(getter_id);
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    // TODO: Yank out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let connector = self.runtime.get_component_public(self.connector_id);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            println!("DEBUG: Waking up connector");
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        } else {
 
            println!("DEBUG: NOT waking up connector");
 
        }
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::Shutdown);
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        self.runtime.decrement_active_interfaces();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::ScheduledConnector;
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new() -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
 
    }
 

	
 
    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
 
        for (idx, port) in self.ports.iter().enumerate() {
 
            if port.self_id == id {
 
                return idx;
 
            }
 
        }
 

	
 
        panic!("port {:?}, not owned by connector", id);
 
    }
 
}
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        let scheduler_id = self.scheduler_id;
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            self.debug("Waiting for work");
 
            let connector_key = self.runtime.wait_for_work();
 
            if connector_key.is_none() {
 
                // We should exit
 
                self.debug(" ... No more work, quitting");
 
                break 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index));
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    // Check for rerouting
 
                    self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                        self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                        self.runtime.send_message(other_connector_id, message);
 
                        continue;
 
                    }
 

	
 
                    self.debug_conn(connector_id, " ... Handling message myself");
 
                    // Check for messages that requires special action from the
 
                    // scheduler.
 
                    if let MessageContents::Control(content) = message.contents {
 
                        match content.content {
 
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                // Need to change port target
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.peer_connector = new_target_connector_id;
 

	
 
                                // Note: for simplicity we program the scheduler to always finish
 
                                // running a connector with an empty outbox. If this ever changes
 
                                // then accepting the "port peer changed" message implies we need
 
                                // to change the recipient of the message in the outbox.
 
                                debug_assert!(delta_state.outbox.is_empty());
 

	
 
                                // And respond with an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::CloseChannel(port_id) => {
 
                                // Mark the port as being closed
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.state = PortState::Closed;
 

	
 
                                // Send an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 

	
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
                        }
 
                    } else {
 
                        // Let connector handle message
 
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
 
                    }
 
                }
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(
 
                        scheduler_ctx, &scheduled.context, &mut delta_state
 
                    );
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
src/runtime2/tests/mod.rs
Show inline comments
 
use std::sync::Arc;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(num_threads, protocol);
 

	
 
    return runtime;
 
}
 

	
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(4, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"putting!\");
 
            put(sender, true);
 
        }
 
        index += 1;
 
    }
 
}
 

	
 
primitive getter(in<bool> receiver, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
 
        synchronous {
 
            print(\"getting!\");
 
            auto result = get(receiver);
 
            assert(result);
 

	
 
        }
 
        index += 1;
 
    }
 
}
 
    ");
 

	
 
    let mut api = rt.create_interface();
 
    let channel = api.create_channel();
 
    let num_loops = 100;
 
    let num_loops = 1;
 

	
 
    api.create_connector("", "putter", ValueGroup::new_stack(vec![
 
        Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create putter");
 

	
 
    api.create_connector("", "getter", ValueGroup::new_stack(vec![
 
        Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })),
 
        Value::UInt32(num_loops)
 
    ])).expect("create getter");
 

	
 
    println!("Am I running?");
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)