Changeset - 1b179e5f4579
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-12 13:07:38
contact@maxhenger.nl
Bugfixes in initial API component
4 files changed with 75 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
@@ -422,7 +422,7 @@ impl FakeTree {
 
    }
 

	
 
    fn is_in_sync(&self) -> bool {
 
        return !self.branches.is_empty();
 
        return self.branches.len() > 1;
 
    }
 

	
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
@@ -460,7 +460,7 @@ impl FakeTree {
 
        debug_assert!(!self.is_in_sync());
 

	
 
        // Create the first branch
 
        let sync_branch = FakeBranch::new_root(0);
 
        let sync_branch = FakeBranch::new_root(1);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
src/runtime2/native.rs
Show inline comments
 
@@ -97,7 +97,7 @@ impl ConnectorApplication {
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: Vec::new(),
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
@@ -107,9 +107,9 @@ impl ConnectorApplication {
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(_) => todo!("data message in API connector"),
 
                Message::Sync(_)  => todo!("sync message in API connector"),
 
                Message::Control(_) => todo!("impossible control message"),
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
    }
 
@@ -132,6 +132,8 @@ impl ConnectorApplication {
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
@@ -247,37 +249,49 @@ impl ConnectorApplication {
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.is_empty());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
@@ -309,7 +323,7 @@ impl ConnectorApplication {
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
@@ -479,7 +493,7 @@ impl ApplicationInterface {
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 
@@ -488,6 +502,7 @@ impl ApplicationInterface {
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        return Ok(lock.take().unwrap().inbox);
 
    }
 

	
src/runtime2/tests/api_component.rs
Show inline comments
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -5,6 +5,7 @@ use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
//
 

	
 
@@ -29,8 +30,6 @@ fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor:
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
pub(crate) struct TestTimer {
0 comments (0 inline, 0 general)