Files
@ 60057e2acf9e
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs
60057e2acf9e
4.6 KiB
application/rls-services+xml
WIP on error-handling implementation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::poll::PollingClient;
use super::component::*;
use super::runtime::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
polling: PollingClient,
scheduler_id: u32,
debug_logging: bool,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
pub polling: &'a PollingClient,
pub id: u32,
pub comp: u32,
pub logging_enabled: bool,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
return Self {
runtime,
polling,
id,
comp: 0,
logging_enabled,
}
}
pub(crate) fn log(&self, text: &str) {
if self.logging_enabled {
println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
}
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
return Scheduler{ runtime, polling, scheduler_id, debug_logging }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
scheduler_ctx.comp = comp_key.0;
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
while let Some(message) = component.inbox.pop() {
component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
}
new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => {
component.component.on_shutdown(&scheduler_ctx);
self.mark_component_as_exiting(&scheduler_ctx, component);
}
}
}
}
// local utilities
/// Marks component as sleeping, if after marking itself as sleeping the
/// inbox contains messages then the component will be immediately
/// rescheduled. After calling this function the component should not be
/// executed anymore.
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
/// Marks the component as exiting by removing the reference it holds to
/// itself. Afterward the component will enter "normal" sleeping mode (if it
/// has not yet been destroyed)
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
// If we didn't yet decrement our reference count, do so now
let comp_key = unsafe{ component.ctx.id.upgrade() };
if !component.exiting {
component.exiting = true;
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
sched_ctx.runtime.destroy_component(comp_key);
return;
}
}
// Enter "regular" sleeping mode
self.mark_component_as_sleeping(comp_key, component);
}
}
|