Files @ 1bc57ab68e0e
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation

1bc57ab68e0e 4.6 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Max Henger
Merge branch 'feat-builtin-ip' into 'master'

feat: Builtin internet component

See merge request nl-cwi-csy/reowolf!6
53d3950d9b6b
0e1a76667937
113e4349a706
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
113e4349a706
0e1a76667937
560ed3c4dc1d
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
113e4349a706
0781cf1b7abf
0781cf1b7abf
560ed3c4dc1d
0e1a76667937
0e1a76667937
968e958c3286
113e4349a706
968e958c3286
968e958c3286
113e4349a706
0781cf1b7abf
0781cf1b7abf
560ed3c4dc1d
968e958c3286
968e958c3286
0781cf1b7abf
0781cf1b7abf
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
0781cf1b7abf
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
0e1a76667937
113e4349a706
113e4349a706
0e1a76667937
0e1a76667937
0e1a76667937
113e4349a706
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0781cf1b7abf
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0781cf1b7abf
113e4349a706
0781cf1b7abf
113e4349a706
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
113e4349a706
113e4349a706
113e4349a706
113e4349a706
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
5415acc02756
5415acc02756
5415acc02756
5415acc02756
0e1a76667937
c04f7fea1a62
0e1a76667937
0e1a76667937
0e1a76667937
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0e1a76667937
0e1a76667937
5415acc02756
5415acc02756
5415acc02756
0de39654770f
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
0de39654770f
0de39654770f
0de39654770f
5415acc02756
5415acc02756
0e1a76667937
0e1a76667937
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::runtime2::poll::PollingClient;

use super::component::*;
use super::runtime::*;

/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
    polling: PollingClient,
    scheduler_id: u32,
    debug_logging: bool,
}

pub(crate) struct SchedulerCtx<'a> {
    pub runtime: &'a RuntimeInner,
    pub polling: &'a PollingClient,
    pub id: u32,
    pub comp: u32,
    pub logging_enabled: bool,
}

impl<'a> SchedulerCtx<'a> {
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
        return Self {
            runtime,
            polling,
            id,
            comp: 0,
            logging_enabled,
        }
    }

    pub(crate) fn log(&self, text: &str) {
        if self.logging_enabled {
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
        }
    }
}

impl Scheduler {
    // public interface to thread

    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
        return Scheduler{ runtime, polling, scheduler_id, debug_logging }
    }

    pub fn run(&mut self) {
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);

        'run_loop: loop {
            // Wait until we have something to do (or need to quit)
            let comp_key = self.runtime.take_work();
            if comp_key.is_none() {
                break 'run_loop;
            }

            let comp_key = comp_key.unwrap();
            let component = self.runtime.get_component(comp_key);
            scheduler_ctx.comp = comp_key.0;

            // Run the component until it no longer indicates that it needs to
            // be re-executed immediately.
            let mut new_scheduling = CompScheduling::Immediate;
            while let CompScheduling::Immediate = new_scheduling {
                while let Some(message) = component.inbox.pop() {
                    component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
                }
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
            }

            // Handle the new scheduling
            match new_scheduling {
                CompScheduling::Immediate => unreachable!(),
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
                CompScheduling::Exit => {
                    component.component.on_shutdown(&scheduler_ctx);
                    self.mark_component_as_exiting(&scheduler_ctx, component);
                }
            }
        }
    }

    // local utilities

    /// Marks component as sleeping, if after marking itself as sleeping the
    /// inbox contains messages then the component will be immediately
    /// rescheduled. After calling this function the component should not be
    /// executed anymore.
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping

        component.public.sleeping.store(true, Ordering::Release);
        if component.inbox.can_pop() {
            let should_reschedule = component.public.sleeping
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
                .is_ok();

            if should_reschedule {
                self.runtime.enqueue_work(key);
            }
        }
    }

    /// Marks the component as exiting by removing the reference it holds to
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
    /// has not yet been destroyed)
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
        // If we didn't yet decrement our reference count, do so now
        let comp_key = unsafe{ component.ctx.id.upgrade() };

        if !component.exiting {
            component.exiting = true;

            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
            let new_count = old_count - 1;
            if new_count == 0 {
                sched_ctx.runtime.destroy_component(comp_key);
                return;
            }
        }

        // Enter "regular" sleeping mode
        self.mark_component_as_sleeping(comp_key, component);
    }
}