Files
@ 64ef56fc060f
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
64ef56fc060f
2.3 KiB
application/rls-services+xml
WIP: Work on MPSC queue, pending bugfixes
Works in single threaded fashion, some issues in multithreaded
stresstest require fixing.
Works in single threaded fashion, some issues in multithreaded
stresstest require fixing.
0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 | use std::sync::atomic::Ordering;
use super::component::*;
use super::runtime::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: RuntimeHandle,
scheduler_id: u32,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a Runtime,
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self {
return Scheduler{ runtime, scheduler_id }
}
pub fn run(&mut self) {
let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let comp_id = comp_key.downgrade();
let component = self.runtime.get_component(comp_key);
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
new_scheduling = component.private.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); }
}
}
}
// local utilities
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
todo!("check for messages");
}
fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) {
todo!("do something")
}
}
|