Files
@ 53d3950d9b6b
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
53d3950d9b6b
3.5 KiB
application/rls-services+xml
WIP: Initial non-communicating component test
53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 9e771c9cf8d3 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 968e958c3286 53d3950d9b6b 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 968e958c3286 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f c04f7fea1a62 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use super::component::*;
use super::runtime::*;
use super::communication::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
scheduler_id: u32,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner) -> Self {
return Self {
runtime,
}
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
return Scheduler{ runtime, scheduler_id }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
}
}
}
// local utilities
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
for port_index in 0..component.ctx.ports.len() {
let port_info = &component.ctx.ports[port_index];
if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
let peer_info = component.ctx.get_peer(peer_id);
peer_info.handle.inbox.push(Message::Control(message));
wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
}
}
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
let comp_key = unsafe{ component.ctx.id.upgrade() };
sched_ctx.runtime.destroy_component(comp_key);
}
}
}
|