Files @ f450ae18ef58
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

f450ae18ef58 2.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
merge with rewrite of connector/scheduler
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;

use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
use super::global_store::GlobalStore;

struct Scheduler {
    global: Arc<GlobalStore>,
    code: Arc<ProtocolDescription>,
}

impl Scheduler {
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
        Self{
            global,
            code,
        }
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        // TODO: @Memory, scheme for reducing allocations if excessive.
        let mut delta_state = RunDeltaState::new()

        loop {
            // TODO: Check if we're supposed to exit

            // Retrieve a unit of work
            let connector_key = self.global.pop_key();
            if connector_key.is_none() {
                // TODO: @Performance, needs condition variable for waking up
                thread::sleep(Duration::new(1, 0));
                continue
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let connector = self.global.get_connector(&connector_key);

            let mut cur_schedule = ConnectorScheduling::Immediate;

            while cur_schedule == ConnectorScheduling::Immediate {
                let new_schedule;

                if connector.is_in_sync_mode() {
                    // In synchronous mode, so we can expect messages being sent,
                    // but we never expect the creation of connectors
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
                    debug_assert!(delta_state.new_connectors.is_empty());

                    if !delta_state.outbox.is_empty() {}
                } else {
                    // In regular running mode (not in a sync block) we cannot send
                    // messages but we can create new connectors
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
                    debug_assert!(delta_state.outbox.is_empty());

                    if !delta_state.new_connectors.is_empty() {
                        // Push all connectors into the global state and queue them
                        // for execution

                    }
                }

                cur_schedule = new_schedule;
            }
        }
    }
}