diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 38250412efb892714887391c708e79c51977df4c..53ed270193deb762acee1587609efe37beb04349 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -69,7 +69,6 @@ pub(crate) enum ConnectorScheduling { Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running Exit, // Connector has exited - Error(EvalError), // Connector has experienced a fatal error } pub(crate) struct ConnectorPDL { @@ -267,7 +266,16 @@ impl ConnectorPDL { let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); if let Err(eval_error) = run_result { - return ConnectorScheduling::Error(eval_error); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; + if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { + // We can exit immediately + return self.enter_non_sync_mode(conclusion, comp_ctx); + } else { + // Current branch failed. But we may have other things that are + // running. + return ConnectorScheduling::Immediate; + } } let run_result = run_result.unwrap(); @@ -408,7 +416,8 @@ impl ConnectorPDL { }; let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); if let Err(eval_error) = run_result { - return ConnectorScheduling::Error(eval_error); + comp_ctx.push_error(eval_error); + return ConnectorScheduling::Exit } let run_result = run_result.unwrap(); @@ -473,14 +482,7 @@ impl ConnectorPDL { // Depending on local state decide what to do let final_branch_id = match conclusion { RoundConclusion::Success(branch_id) => Some(branch_id), - RoundConclusion::Failure => if self.mode == Mode::SyncError { - // We experienced an error, so exit now - None - } else { - // We didn't experience an error, so retry - // TODO: Decide what to do with sync errors - Some(BranchId::new_invalid()) - } + RoundConclusion::Failure => None, }; if let Some(solution_branch_id) = final_branch_id { @@ -499,6 +501,9 @@ impl ConnectorPDL { // No final branch, because we're supposed to exit! self.last_finished_handled = None; self.mode = Mode::Error; + if let Some(eval_error) = self.eval_error.take() { + ctx.push_error(eval_error); + } return ConnectorScheduling::Exit; } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index cf8769e28a413656d2b1da5dbee0b9424f700e96..bafdf0dc3198bdff60ba18befa21bf461c0d7edc 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -59,6 +59,7 @@ impl Scheduler { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); + self.handle_inbox_while_shutting_down(scheduled); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector @@ -72,7 +73,7 @@ impl Scheduler { let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); - + // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled); @@ -112,17 +113,13 @@ impl Scheduler { } if scheduled.router.num_pending_acks() == 0 { + // All ports (if any) already closed self.runtime.destroy_component(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); }, - ConnectorScheduling::Error(eval_error) => { - // Display error. Then exit - println!("Oh oh!\n{}", eval_error); - panic!("Abort!"); - } } } } @@ -198,6 +195,21 @@ impl Scheduler { } } + /// Handles inbox messages while shutting down. This intends to handle the + /// case where a component cleanly exited outside of a sync region, but a + /// peer, before receiving the `CloseChannel` message, sent a message inside + /// a sync region. This peer should be notified that its message is not + /// received by a component in a sync region. + fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { + // Note: we're not handling the public inbox, we're dealing with the + // private one! + while let Some(message) = scheduled.ctx.read_next_message() { + if let Some(target_port) = Self::get_message_target_port(&message) { + todo!("handle this, send back 'my thing is closed yo'") + } + } + } + /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message @@ -453,7 +465,7 @@ impl ComponentCtx { /// for waiting until it is appropriate to shut down (i.e. being outside /// of a sync region) and returning the `Exit` scheduling code. pub(crate) fn push_error(&mut self, error: EvalError) { - + println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error); } #[inline] diff --git a/src/runtime2/tests/basics.rs b/src/runtime2/tests/basics.rs index 471394c34e84602b9d7197b13a54b41c0f5b4a79..0a3a06db525162b88b488d57b0e6ce94bd204458 100644 --- a/src/runtime2/tests/basics.rs +++ b/src/runtime2/tests/basics.rs @@ -1,6 +1,49 @@ use super::*; +#[test] +fn test_doing_nothing() { + // If this thing does not get into an infinite loop, (hence: the runtime + // exits), then the test works + const CODE: &'static str =" + primitive silent_willy(u32 loops) { + u32 index = 0; + while (index < loops) { + sync { index += 1; } + } + } + "; + + let thing = TestTimer::new("doing_nothing"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "silent_willy", ValueGroup::new_stack(vec![ + Value::UInt32(NUM_LOOPS), + ])).expect("create component"); + }); +} + +#[test] +fn test_local_sync_failure() { + // If the component exits cleanly, then the runtime exits cleanly, and the + // test will finish + const CODE: &'static str = " + primitive immediate_failure() { + u32[] only_allows_index_0 = { 1 }; + while (true) sync { // note the infinite loop + auto value = only_allows_index_0[1]; + } + } + "; + + let thing = TestTimer::new("immediate_local_failure"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "immediate_failure", ValueGroup::new_stack(Vec::new())) + .expect("create component"); + }) +} + + + #[test] fn test_single_put_and_get() { const CODE: &'static str = " diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index d25324b1f3299f70dc7c80998af5ae4aff815450..46cf663a10725b3578b54782bfb256bb79b8cc93 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -10,13 +10,13 @@ use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing -pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime -pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed -pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) +// pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime +// pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed +// pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -// pub(crate) const NUM_THREADS: u32 = 1; -// pub(crate) const NUM_INSTANCES: u32 = 1; -// pub(crate) const NUM_LOOPS: u32 = 1; +pub(crate) const NUM_THREADS: u32 = 1; +pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_LOOPS: u32 = 1; fn create_runtime(pdl: &str) -> Runtime {