Files
@ ac804a4a3d70
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs
ac804a4a3d70
5.0 KiB
application/rls-services+xml
More granularity in debug logging
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::poll::PollingClient;
use super::component::*;
use super::runtime::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
polling: PollingClient,
scheduler_id: u32,
log_level: LogLevel,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
pub polling: &'a PollingClient,
pub id: u32,
pub comp: u32,
pub log_level: LogLevel,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self {
return Self {
runtime,
polling,
id,
comp: 0,
log_level,
}
}
pub(crate) fn debug(&self, text: &str) {
// TODO: Probably not always use colour
if self.log_level >= LogLevel::Debug {
println!("[s:{:02}, c:{:03}] \x1b[0;34m{}\x1b[0m", self.id, self.comp, text);
}
}
pub(crate) fn info(&self, text: &str) {
if self.log_level >= LogLevel::Info {
println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
}
}
pub(crate) fn error(&self, text: &str) {
// TODO: Probably not always use colour
println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text);
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self {
return Scheduler{ runtime, polling, scheduler_id, log_level }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
scheduler_ctx.comp = comp_key.0;
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
while let Some(message) = component.inbox.pop() {
component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
}
new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => {
component.component.on_shutdown(&scheduler_ctx);
self.mark_component_as_exiting(&scheduler_ctx, component);
}
}
}
}
// local utilities
/// Marks component as sleeping, if after marking itself as sleeping the
/// inbox contains messages then the component will be immediately
/// rescheduled. After calling this function the component should not be
/// executed anymore.
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
/// Marks the component as exiting by removing the reference it holds to
/// itself. Afterward the component will enter "normal" sleeping mode (if it
/// has not yet been destroyed)
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
// If we didn't yet decrement our reference count, do so now
let comp_key = unsafe{ component.ctx.id.upgrade() };
if !component.exiting {
component.exiting = true;
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
sched_ctx.runtime.destroy_component(comp_key);
return;
}
}
// Enter "regular" sleeping mode
self.mark_component_as_sleeping(comp_key, component);
}
}
|