diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index cf8769e28a413656d2b1da5dbee0b9424f700e96..bafdf0dc3198bdff60ba18befa21bf461c0d7edc 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -59,6 +59,7 @@ impl Scheduler { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); + self.handle_inbox_while_shutting_down(scheduled); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector @@ -72,7 +73,7 @@ impl Scheduler { let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); - + // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled); @@ -112,17 +113,13 @@ impl Scheduler { } if scheduled.router.num_pending_acks() == 0 { + // All ports (if any) already closed self.runtime.destroy_component(connector_key); continue 'thread_loop; } self.try_go_to_sleep(connector_key, scheduled); }, - ConnectorScheduling::Error(eval_error) => { - // Display error. Then exit - println!("Oh oh!\n{}", eval_error); - panic!("Abort!"); - } } } } @@ -198,6 +195,21 @@ impl Scheduler { } } + /// Handles inbox messages while shutting down. This intends to handle the + /// case where a component cleanly exited outside of a sync region, but a + /// peer, before receiving the `CloseChannel` message, sent a message inside + /// a sync region. This peer should be notified that its message is not + /// received by a component in a sync region. + fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { + // Note: we're not handling the public inbox, we're dealing with the + // private one! + while let Some(message) = scheduled.ctx.read_next_message() { + if let Some(target_port) = Self::get_message_target_port(&message) { + todo!("handle this, send back 'my thing is closed yo'") + } + } + } + /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message @@ -453,7 +465,7 @@ impl ComponentCtx { /// for waiting until it is appropriate to shut down (i.e. being outside /// of a sync region) and returning the `Exit` scheduling code. pub(crate) fn push_error(&mut self, error: EvalError) { - + println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error); } #[inline]