Files
@ d06da4e9296c
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
d06da4e9296c
3.9 KiB
application/rls-services+xml
WIP: Reimplementing messaging and consensus
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 9e771c9cf8d3 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0781cf1b7abf 0781cf1b7abf 0e1a76667937 0e1a76667937 968e958c3286 0781cf1b7abf 968e958c3286 968e958c3286 0781cf1b7abf 0781cf1b7abf 968e958c3286 968e958c3286 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf 968e958c3286 968e958c3286 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f c04f7fea1a62 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use super::component::*;
use super::runtime::*;
use super::communication::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
scheduler_id: u32,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
pub id: u32,
pub comp: u32,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
return Self {
runtime,
id,
comp: 0,
}
}
pub(crate) fn log(&self, text: &str) {
println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
return Scheduler{ runtime, scheduler_id }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
scheduler_ctx.comp = comp_key.0;
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
while let Some(message) = component.inbox.pop() {
component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
}
new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
}
}
}
// local utilities
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
for port_index in 0..component.ctx.ports.len() {
let port_info = &component.ctx.ports[port_index];
if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
let peer_info = component.ctx.get_peer(peer_id);
peer_info.handle.inbox.push(Message::Control(message));
wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
}
}
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
let comp_key = unsafe{ component.ctx.id.upgrade() };
sched_ctx.runtime.destroy_component(comp_key);
}
}
}
|