Files @ e7df1d2ae35f
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation

e7df1d2ae35f 4.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP: Updated port management to be more maintainable
53d3950d9b6b
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
9e771c9cf8d3
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
0781cf1b7abf
0781cf1b7abf
0e1a76667937
0e1a76667937
968e958c3286
0781cf1b7abf
968e958c3286
968e958c3286
0781cf1b7abf
0781cf1b7abf
968e958c3286
968e958c3286
0781cf1b7abf
0781cf1b7abf
0781cf1b7abf
0781cf1b7abf
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0781cf1b7abf
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0781cf1b7abf
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0781cf1b7abf
0781cf1b7abf
0781cf1b7abf
c04f7fea1a62
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0de39654770f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
c04f7fea1a62
0e1a76667937
0e1a76667937
0e1a76667937
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0e1a76667937
0e1a76667937
0de39654770f
e7df1d2ae35f
0de39654770f
0de39654770f
e7df1d2ae35f
0de39654770f
6555f56a22a9
0de39654770f
0de39654770f
0de39654770f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0de39654770f
0e1a76667937
0e1a76667937
use std::sync::Arc;
use std::sync::atomic::Ordering;

use super::component::*;
use super::runtime::*;
use super::communication::*;

/// Data associated with a scheduler thread
pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
    scheduler_id: u32,
}

pub(crate) struct SchedulerCtx<'a> {
    pub runtime: &'a RuntimeInner,
    pub id: u32,
    pub comp: u32,
}

impl<'a> SchedulerCtx<'a> {
    pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
        return Self {
            runtime,
            id,
            comp: 0,
        }
    }

    pub(crate) fn log(&self, text: &str) {
        println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
    }
}

impl Scheduler {
    // public interface to thread

    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
        return Scheduler{ runtime, scheduler_id }
    }

    pub fn run(&mut self) {
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);

        'run_loop: loop {
            // Wait until we have something to do (or need to quit)
            let comp_key = self.runtime.take_work();
            if comp_key.is_none() {
                break 'run_loop;
            }

            let comp_key = comp_key.unwrap();
            let component = self.runtime.get_component(comp_key);
            scheduler_ctx.comp = comp_key.0;

            // Run the component until it no longer indicates that it needs to
            // be re-executed immediately.
            let mut new_scheduling = CompScheduling::Immediate;
            while let CompScheduling::Immediate = new_scheduling {
                while let Some(message) = component.inbox.pop() {
                    component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
                }
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
            }

            // Handle the new scheduling
            match new_scheduling {
                CompScheduling::Immediate => unreachable!(),
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
            }
        }
    }

    // local utilities

    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping

        component.public.sleeping.store(true, Ordering::Release);
        if component.inbox.can_pop() {
            let should_reschedule = component.public.sleeping
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
                .is_ok();

            if should_reschedule {
                self.runtime.enqueue_work(key);
            }
        }
    }

    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
        // Send messages that all ports will be closed
        for port_index in 0..component.ctx.ports.len() {
            let port_info = &component.ctx.ports[port_index];
            if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) {
                let peer_info = component.ctx.get_peer(peer_id);
                peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
            }
        }

        // Remove all references to the peers that we have
        for mut peer in component.ctx.peers.drain(..) {
            let should_remove = peer.handle.decrement_users();
            if should_remove {
                let key = unsafe{ peer.id.upgrade() };
                sched_ctx.runtime.destroy_component(key);
            }
        }

        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
        let new_count = old_count - 1;
        if new_count == 0 {
            let comp_key = unsafe{ component.ctx.id.upgrade() };
            sched_ctx.runtime.destroy_component(comp_key);
        }
    }
}