use std::sync::Arc; use std::sync::atomic::Ordering; use crate::runtime2::poll::PollingClient; use super::component::*; use super::runtime::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { runtime: Arc, polling: PollingClient, scheduler_id: u32, log_level: LogLevel, } pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a RuntimeInner, pub polling: &'a PollingClient, pub id: u32, pub comp: u32, pub log_level: LogLevel, } impl<'a> SchedulerCtx<'a> { pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self { return Self { runtime, polling, id, comp: 0, log_level, } } pub(crate) fn debug(&self, text: &str) { // TODO: Probably not always use colour if LogLevel::Debug >= self.log_level { println!("[s:{:02}, c:{:03}] \x1b[0;36m{}\x1b[0m", self.id, self.comp, text); } } pub(crate) fn info(&self, text: &str) { if LogLevel::Info >= self.log_level { println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text); } } pub(crate) fn error(&self, text: &str) { // TODO: Probably not always use colour println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text); } } impl Scheduler { // public interface to thread pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self { return Scheduler{ runtime, polling, scheduler_id, log_level } } pub fn run(&mut self) { let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level); 'run_loop: loop { // Wait until we have something to do (or need to quit) let comp_key = self.runtime.take_work(); if comp_key.is_none() { break 'run_loop; } let comp_key = comp_key.unwrap(); let component = self.runtime.get_component(comp_key); scheduler_ctx.comp = comp_key.0; // Run the component until it no longer indicates that it needs to // be re-executed immediately. let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { while let Some(message) = component.inbox.pop() { component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message); } new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx); } // Handle the new scheduling match new_scheduling { CompScheduling::Immediate => unreachable!(), CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, CompScheduling::Exit => { component.component.on_shutdown(&scheduler_ctx); self.mark_component_as_exiting(&scheduler_ctx, component); } } } } // local utilities /// Marks component as sleeping, if after marking itself as sleeping the /// inbox contains messages then the component will be immediately /// rescheduled. After calling this function the component should not be /// executed anymore. fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) { debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping component.public.sleeping.store(true, Ordering::Release); if component.inbox.can_pop() { let should_reschedule = component.public.sleeping .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) .is_ok(); if should_reschedule { self.runtime.enqueue_work(key); } } } /// Marks the component as exiting by removing the reference it holds to /// itself. Afterward the component will enter "normal" sleeping mode (if it /// has not yet been destroyed) fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { // If we didn't yet decrement our reference count, do so now let comp_key = unsafe{ component.ctx.id.upgrade() }; if !component.exiting { component.exiting = true; let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; if new_count == 0 { sched_ctx.runtime.destroy_component(comp_key); return; } } // Enter "regular" sleeping mode self.mark_component_as_sleeping(comp_key, component); } }