Files
@ e7df1d2ae35f
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
e7df1d2ae35f
4.2 KiB
application/rls-services+xml
WIP: Updated port management to be more maintainable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 9e771c9cf8d3 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0781cf1b7abf 0781cf1b7abf 0e1a76667937 0e1a76667937 968e958c3286 0781cf1b7abf 968e958c3286 968e958c3286 0781cf1b7abf 0781cf1b7abf 968e958c3286 968e958c3286 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf 968e958c3286 968e958c3286 0e1a76667937 0e1a76667937 0e1a76667937 53d3950d9b6b 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0781cf1b7abf 0781cf1b7abf 0781cf1b7abf c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 0e1a76667937 c04f7fea1a62 0e1a76667937 0e1a76667937 0e1a76667937 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 0de39654770f e7df1d2ae35f 0de39654770f 0de39654770f e7df1d2ae35f 0de39654770f 6555f56a22a9 0de39654770f 0de39654770f 0de39654770f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f e7df1d2ae35f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0de39654770f 0e1a76667937 0e1a76667937 | use std::sync::Arc;
use std::sync::atomic::Ordering;
use super::component::*;
use super::runtime::*;
use super::communication::*;
/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
runtime: Arc<RuntimeInner>,
scheduler_id: u32,
}
pub(crate) struct SchedulerCtx<'a> {
pub runtime: &'a RuntimeInner,
pub id: u32,
pub comp: u32,
}
impl<'a> SchedulerCtx<'a> {
pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
return Self {
runtime,
id,
comp: 0,
}
}
pub(crate) fn log(&self, text: &str) {
println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
}
}
impl Scheduler {
// public interface to thread
pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
return Scheduler{ runtime, scheduler_id }
}
pub fn run(&mut self) {
let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
'run_loop: loop {
// Wait until we have something to do (or need to quit)
let comp_key = self.runtime.take_work();
if comp_key.is_none() {
break 'run_loop;
}
let comp_key = comp_key.unwrap();
let component = self.runtime.get_component(comp_key);
scheduler_ctx.comp = comp_key.0;
// Run the component until it no longer indicates that it needs to
// be re-executed immediately.
let mut new_scheduling = CompScheduling::Immediate;
while let CompScheduling::Immediate = new_scheduling {
while let Some(message) = component.inbox.pop() {
component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
}
new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
}
// Handle the new scheduling
match new_scheduling {
CompScheduling::Immediate => unreachable!(),
CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
}
}
}
// local utilities
fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
component.public.sleeping.store(true, Ordering::Release);
if component.inbox.can_pop() {
let should_reschedule = component.public.sleeping
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok();
if should_reschedule {
self.runtime.enqueue_work(key);
}
}
}
fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
// Send messages that all ports will be closed
for port_index in 0..component.ctx.ports.len() {
let port_info = &component.ctx.ports[port_index];
if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) {
let peer_info = component.ctx.get_peer(peer_id);
peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
}
}
// Remove all references to the peers that we have
for mut peer in component.ctx.peers.drain(..) {
let should_remove = peer.handle.decrement_users();
if should_remove {
let key = unsafe{ peer.id.upgrade() };
sched_ctx.runtime.destroy_component(key);
}
}
let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
let new_count = old_count - 1;
if new_count == 0 {
let comp_key = unsafe{ component.ctx.id.upgrade() };
sched_ctx.runtime.destroy_component(comp_key);
}
}
}
|