diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index a2a9221a466b21ce5b0dba5fcc3ddd14149c17b2..f3fa84c5ee325768f51d0ca69ce14f834f95e02e 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -422,7 +422,7 @@ impl FakeTree { } fn is_in_sync(&self) -> bool { - return !self.branches.is_empty(); + return self.branches.len() > 1; } pub fn queue_is_empty(&self, kind: QueueKind) -> bool { @@ -460,7 +460,7 @@ impl FakeTree { debug_assert!(!self.is_in_sync()); // Create the first branch - let sync_branch = FakeBranch::new_root(0); + let sync_branch = FakeBranch::new_root(1); let sync_branch_id = sync_branch.id; self.branches.push(sync_branch); diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 7b70bab641d8ae67019d6bfeaf8abd8c1a65dec8..48e2428f48d7dd91f2efd6535ee7e4d07a5f0f63 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -97,7 +97,7 @@ impl ConnectorApplication { tree: FakeTree::new(), consensus: Consensus::new(), last_finished_handled: None, - branch_extra: Vec::new(), + branch_extra: vec![0], }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -107,9 +107,9 @@ impl ConnectorApplication { fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { while let Some(message) = comp_ctx.read_next_message() { match message { - Message::Data(_) => todo!("data message in API connector"), - Message::Sync(_) => todo!("sync message in API connector"), - Message::Control(_) => todo!("impossible control message"), + Message::Data(message) => self.handle_new_data_message(message, comp_ctx), + Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx), + Message::Control(_) => unreachable!("control message in native API component"), } } } @@ -132,6 +132,8 @@ impl ConnectorApplication { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; @@ -247,37 +249,49 @@ impl ConnectorApplication { // In non-sync mode the application component doesn't really do anything // except performing jobs submitted from the API. This is the only // case where we expect to be woken up. + // Note that we have to communicate to the scheduler when we've received + // ports or created components (hence: given away ports) *before* we + // enter a sync round. let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { comp_ctx.push_port(endpoint_a); comp_ctx.push_port(endpoint_b); + + return ConnectorScheduling::Immediate; } ApplicationJob::NewConnector(connector, initial_ports) => { comp_ctx.push_component(connector, initial_ports); + + return ConnectorScheduling::Later; }, ApplicationJob::SyncRound(mut description) => { // Entering sync mode + comp_ctx.notify_sync_start(); self.sync_desc = description; self.is_in_sync = true; debug_assert!(self.last_finished_handled.is_none()); - debug_assert!(self.branch_extra.is_empty()); + debug_assert!(self.branch_extra.len() == 1); let first_branch_id = self.tree.start_sync(); self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + debug_assert!(first_branch_id.index == 1); self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id); self.branch_extra.push(0); // set first branch to first instruction return ConnectorScheduling::Immediate; }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; } } } + // Queue was empty return ConnectorScheduling::NotNow; } @@ -309,7 +323,7 @@ impl ConnectorApplication { // Notifying interface of ending sync self.is_in_sync = false; self.sync_desc.clear(); - self.branch_extra.clear(); + self.branch_extra.truncate(1); self.last_finished_handled = None; let (results, notification) = &*self.sync_done; @@ -479,7 +493,7 @@ impl ApplicationInterface { } /// Wait until the next sync-round is finished - pub fn wait(&self) -> Result, ApplicationEndSyncError> { + pub fn wait(&mut self) -> Result, ApplicationEndSyncError> { if !self.is_in_sync { return Err(ApplicationEndSyncError::NotInSync); } @@ -488,6 +502,7 @@ impl ApplicationInterface { let mut lock = is_done.lock().unwrap(); lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + self.is_in_sync = false; return Ok(lock.take().unwrap().inbox); } diff --git a/src/runtime2/tests/api_component.rs b/src/runtime2/tests/api_component.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..f4271190e131654ae8379ae6fed02e85a782ecfa 100644 --- a/src/runtime2/tests/api_component.rs +++ b/src/runtime2/tests/api_component.rs @@ -0,0 +1,50 @@ +// Testing the api component. +// +// These tests explicitly do not use the "NUM_INSTANCES" constant because we're +// doing some communication with the native component. Hence only expect one + +use super::*; + +#[test] +fn test_put_and_get() { + const CODE: &'static str = " + primitive handler(in request, out response, u32 loops) { + u32 index = 0; + while (index < loops) { + synchronous { + auto value = get(request); + put(response, value * 2); + } + index += 1; + } + } + "; + + let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap(); + let rt = Runtime::new(NUM_THREADS, pd); + let mut api = rt.create_interface(); + + let req_chan = api.create_channel().unwrap(); + let resp_chan = api.create_channel().unwrap(); + + api.create_connector("", "handler", ValueGroup::new_stack(vec![ + Value::Input(PortId::new(req_chan.getter_id.index)), + Value::Output(PortId::new(resp_chan.putter_id.index)), + Value::UInt32(NUM_LOOPS), + ])).unwrap(); + + for loop_idx in 0..NUM_LOOPS { + api.perform_sync_round(vec![ + ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])), + ApplicationSyncAction::Get(resp_chan.getter_id) + ]).expect("start sync round"); + + let result = api.wait().expect("finish sync round"); + assert!(result.len() == 1); + if let Value::UInt32(gotten) = result[0].values[0] { + assert_eq!(gotten, loop_idx * 2); + } else { + assert!(false); + } + } +} \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 9d285806e41099cc5308b6ec053a54ae420c86d7..ccf488b87273ce44e6efc2e59e87dfbc47f26e4c 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -5,6 +5,7 @@ use super::*; use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; +use crate::runtime2::native::{ApplicationSyncAction}; // @@ -29,8 +30,6 @@ fn run_test_in_runtime(pdl: &str, constructor: for _ in 0..NUM_INSTANCES { constructor(&mut api); } - - // Wait until done :) } pub(crate) struct TestTimer {