Files @ cf26538b25dc
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

cf26538b25dc 4.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
architecture for send/recv ports in place
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;

use super::inbox::InboxMessage;
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
use super::global_store::GlobalStore;

struct Scheduler {
    global: Arc<GlobalStore>,
    code: Arc<ProtocolDescription>,
}

impl Scheduler {
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
        Self{
            global,
            code,
        }
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        // TODO: @Memory, scheme for reducing allocations if excessive.
        let mut delta_state = RunDeltaState::new();

        loop {
            // TODO: Check if we're supposed to exit

            // Retrieve a unit of work
            let connector_key = self.global.connector_queue.pop_front();
            if connector_key.is_none() {
                // TODO: @Performance, needs condition variable for waking up
                thread::sleep(Duration::new(1, 0));
                continue
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let connector = self.global.connectors.get_mut(&connector_key);

            let mut cur_schedule = ConnectorScheduling::Immediate;

            while cur_schedule == ConnectorScheduling::Immediate {
                let new_schedule;

                // TODO: Check inbox for new message

                if connector.is_in_sync_mode() {
                    // In synchronous mode, so we can expect messages being sent,
                    // but we never expect the creation of connectors
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
                    debug_assert!(delta_state.new_connectors.is_empty());

                    if !delta_state.outbox.is_empty() {
                        // There are message to send
                        for message in delta_state.outbox.drain(..) {
                            let (inbox_message, target_connector_id) = {
                                // Note: retrieving a port incurs a read lock
                                let sending_port = self.global.ports.get(&connector_key, message.sending_port);
                                (
                                    InboxMessage {
                                        sending_port: sending_port.self_id,
                                        receiving_port: sending_port.peer_id,
                                        sender_prev_branch_id: message.sender_prev_branch_id,
                                        sender_cur_branch_id: message.sender_cur_branch_id,
                                        message: message.message,
                                    },
                                    sending_port.peer_connector,
                                )
                            };

                            let target_connector = self.global.connectors.get_shared(target_connector_id);
                            target_connector.inbox.insert_message(inbox_message);

                            // TODO: Check silent state. Queue connector if it was silent
                        }
                    }
                } else {
                    // In regular running mode (not in a sync block) we cannot send
                    // messages but we can create new connectors
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
                    debug_assert!(delta_state.outbox.is_empty());

                    if !delta_state.new_connectors.is_empty() {
                        // Push all connectors into the global state and queue them
                        // for execution
                        for connector in delta_state.new_connectors.drain(..) {
                            // Create connector, modify all of the ports that
                            // it now owns, then queue it for execution
                            let connector_key = self.global.connectors.create(connector);
                            
                            self.global.connector_queue.push_back(connector_key);
                        }
                    }
                }

                cur_schedule = new_schedule;
            }
        }
    }
}