diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e7359488f13d940ccc259ceabb1cb744c0b6de23..d3159c82aec89f2d63ea6b11a71917be3a285417 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::poll::PollingClient; use super::component::*; use super::runtime::*; @@ -7,21 +8,24 @@ use super::runtime::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { runtime: Arc, + polling: PollingClient, scheduler_id: u32, debug_logging: bool, } pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a RuntimeInner, + pub polling: &'a PollingClient, pub id: u32, pub comp: u32, pub logging_enabled: bool, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self { + pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self { return Self { runtime, + polling, id, comp: 0, logging_enabled, @@ -38,12 +42,12 @@ impl<'a> SchedulerCtx<'a> { impl Scheduler { // public interface to thread - pub fn new(runtime: Arc, scheduler_id: u32, debug_logging: bool) -> Self { - return Scheduler{ runtime, scheduler_id, debug_logging } + pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self { + return Scheduler{ runtime, polling, scheduler_id, debug_logging } } pub fn run(&mut self) { - let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging); + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging); 'run_loop: loop { // Wait until we have something to do (or need to quit) @@ -61,9 +65,9 @@ impl Scheduler { let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { while let Some(message) = component.inbox.pop() { - component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message); + component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message); } - new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); + new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); } // Handle the new scheduling @@ -71,7 +75,10 @@ impl Scheduler { CompScheduling::Immediate => unreachable!(), CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, - CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); } + CompScheduling::Exit => { + component.component.on_shutdown(&scheduler_ctx); + self.mark_component_as_exiting(&scheduler_ctx, component); + } } } }