Files @ 9e771c9cf8d3
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation

9e771c9cf8d3 2.3 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP: Control messaging between components
use std::sync::atomic::Ordering;

use super::component::*;
use super::runtime::*;
use super::communication::*;

/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
    runtime: RuntimeHandle,
    scheduler_id: u32,
}

pub(crate) struct SchedulerCtx<'a> {
    pub runtime: &'a Runtime,
}

impl Scheduler {
    // public interface to thread

    pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self {
        return Scheduler{ runtime, scheduler_id }
    }

    pub fn run(&mut self) {
        let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };

        'run_loop: loop {
            // Wait until we have something to do (or need to quit)
            let comp_key = self.runtime.take_work();
            if comp_key.is_none() {
                break 'run_loop;
            }

            let comp_key = comp_key.unwrap();
            let comp_id = comp_key.downgrade();
            let component = self.runtime.get_component(comp_key);

            // Run the component until it no longer indicates that it needs to
            // be re-executed immediately.
            let mut new_scheduling = CompScheduling::Immediate;
            while let CompScheduling::Immediate = new_scheduling {
                new_scheduling = component.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
            }

            // Handle the new scheduling
            match new_scheduling {
                CompScheduling::Immediate => unreachable!(),
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
                CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); }
            }
        }
    }

    // local utilities

    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
        debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping

        component.public.sleeping.store(true, Ordering::Release);
        todo!("check for messages");
    }

    fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) {
        todo!("do something")
    }
}