Files @ 38c129959044
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

38c129959044 5.0 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Max Henger
feat: transmitting ports
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::poll::PollingClient;

use super::component::*;
use super::runtime::*;

/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
    polling: PollingClient,
    scheduler_id: u32,
    log_level: LogLevel,
}

pub(crate) struct SchedulerCtx<'a> {
    pub runtime: &'a RuntimeInner,
    pub polling: &'a PollingClient,
    pub id: u32,
    pub comp: u32,
    pub log_level: LogLevel,
}

impl<'a> SchedulerCtx<'a> {
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self {
        return Self {
            runtime,
            polling,
            id,
            comp: 0,
            log_level,
        }
    }

    pub(crate) fn debug(&self, text: &str) {
        // TODO: Probably not always use colour
        if LogLevel::Debug >= self.log_level {
            println!("[s:{:02}, c:{:03}] \x1b[0;36m{}\x1b[0m", self.id, self.comp, text);
        }
    }

    pub(crate) fn info(&self, text: &str) {
        if LogLevel::Info >= self.log_level {
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
        }
    }

    pub(crate) fn error(&self, text: &str) {
        // TODO: Probably not always use colour
        println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text);
    }
}

impl Scheduler {
    // public interface to thread

    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self {
        return Scheduler{ runtime, polling, scheduler_id, log_level }
    }

    pub fn run(&mut self) {
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level);

        'run_loop: loop {
            // Wait until we have something to do (or need to quit)
            let comp_key = self.runtime.take_work();
            if comp_key.is_none() {
                break 'run_loop;
            }

            let comp_key = comp_key.unwrap();
            let component = self.runtime.get_component(comp_key);
            scheduler_ctx.comp = comp_key.0;

            // Run the component until it no longer indicates that it needs to
            // be re-executed immediately.
            let mut new_scheduling = CompScheduling::Immediate;
            while let CompScheduling::Immediate = new_scheduling {
                while let Some(message) = component.inbox.pop() {
                    component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
                }
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
            }

            // Handle the new scheduling
            match new_scheduling {
                CompScheduling::Immediate => unreachable!(),
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
                CompScheduling::Exit => {
                    component.component.on_shutdown(&scheduler_ctx);
                    self.mark_component_as_exiting(&scheduler_ctx, comp_key, component);
                }
            }
        }
    }

    // local utilities

    /// Marks component as sleeping, if after marking itself as sleeping the
    /// inbox contains messages then the component will be immediately
    /// rescheduled. After calling this function the component should not be
    /// executed anymore.
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping

        component.public.sleeping.store(true, Ordering::Release);
        if component.inbox.can_pop() {
            let should_reschedule = component.public.sleeping
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
                .is_ok();

            if should_reschedule {
                self.runtime.enqueue_work(key);
            }
        }
    }

    /// Marks the component as exiting by removing the reference it holds to
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
    /// has not yet been destroyed)
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, comp_key: CompKey, component: &mut RuntimeComp) {
        // If we didn't yet decrement our reference count, do so now
        if !component.exiting {
            component.exiting = true;

            let maybe_comp_key = component.ctx.remove_self_reference();
            if let Some(_comp_key) = maybe_comp_key {
                debug_assert_eq!(_comp_key.0, comp_key.0);
                sched_ctx.runtime.destroy_component(comp_key);
                return;
            }
        }

        // Enter "regular" sleeping mode
        self.mark_component_as_sleeping(comp_key, component);
    }
}