Files
@ 1f78496722d1
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
1f78496722d1
5.0 KiB
application/rls-services+xml
feat: runtime error handling
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | 53d3950d9b6b 0e1a76667937 113e4349a706 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 113e4349a706 0e1a76667937 1f78496722d1 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 113e4349a706 0781cf1b7abf 0781cf1b7abf 1f78496722d1 0e1a76667937 0e1a76667937 968e958c3286 1f78496722d1 968e958c3286 968e958c3286 113e4349a706 0781cf1b7abf 0781cf1b7abf 1f78496722d1 968e958c3286 968e958c3286 0781cf1b7abf 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 560ed3c4dc1d 560ed3c4dc1d 0781cf1b7abf 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 1f78496722d1 968e958c3286 968e958c3286 0e1a76667937 0e1a76667937 0e1a76667937 1f78496722d1 1f78496722d1 0e1a76667937 0e1a76667937 0e1a76667937 1f78496722d1 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 113e4349a706 0781cf1b7abf 1f78496722d1 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 113e4349a706 113e4349a706 113e4349a706 113e4349a706 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 5415acc02756 5415acc02756 5415acc02756 5415acc02756 0e1a76667937 c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 5415acc02756 5415acc02756 5415acc02756 0de39654770f 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 5415acc02756 0de39654770f 0de39654770f 0de39654770f 5415acc02756 5415acc02756 0e1a76667937 0e1a76667937 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::poll::PollingClient;
use super::component::*;
use super::runtime::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
polling: PollingClient,
scheduler_id: u32,
log_level: LogLevel,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
pub polling: &'a PollingClient,
pub id: u32,
pub comp: u32,
pub log_level: LogLevel,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self {
return Self {
runtime,
polling,
id,
comp: 0,
log_level,
}
}
pub(crate) fn debug(&self, text: &str) {
// TODO: Probably not always use colour
if LogLevel::Debug >= self.log_level {
println!("[s:{:02}, c:{:03}] \x1b[0;36m{}\x1b[0m", self.id, self.comp, text);
}
}
pub(crate) fn info(&self, text: &str) {
if LogLevel::Info >= self.log_level {
println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
}
}
pub(crate) fn error(&self, text: &str) {
// TODO: Probably not always use colour
println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text);
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self {
return Scheduler{ runtime, polling, scheduler_id, log_level }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
scheduler_ctx.comp = comp_key.0;
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
while let Some(message) = component.inbox.pop() {
component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
}
new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => {
component.component.on_shutdown(&scheduler_ctx);
self.mark_component_as_exiting(&scheduler_ctx, component);
}
}
}
}
// local utilities
/// Marks component as sleeping, if after marking itself as sleeping the
/// inbox contains messages then the component will be immediately
/// rescheduled. After calling this function the component should not be
/// executed anymore.
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
/// Marks the component as exiting by removing the reference it holds to
/// itself. Afterward the component will enter "normal" sleeping mode (if it
/// has not yet been destroyed)
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
// If we didn't yet decrement our reference count, do so now
let comp_key = unsafe{ component.ctx.id.upgrade() };
if !component.exiting {
component.exiting = true;
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
sched_ctx.runtime.destroy_component(comp_key);
return;
}
}
// Enter "regular" sleeping mode
self.mark_component_as_sleeping(comp_key, component);
}
}
|