Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

58dfabd1be9f 13.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
moving to laptop
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::thread;

use crate::ProtocolDescription;
use crate::runtime2::global_store::ConnectorVariant;

use super::RuntimeInner;
use super::port::{PortIdLocal};
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};

pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
}

impl Scheduler {
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
        return Self{ runtime };
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        let mut delta_state = RunDeltaState::new();

        'thread_loop: loop {
            // Retrieve a unit of work
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
            if connector_key.is_none() {
                // TODO: @Performance, needs condition or something, and most
                //  def' not sleeping
                thread::sleep(Duration::new(1, 0));
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
                    // Thread exits!
                    break 'thread_loop;
                }

                continue 'thread_loop;
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);

            // Keep running until we should no longer immediately schedule the
            // connector.
            let mut cur_schedule = ConnectorScheduling::Immediate;
            while cur_schedule == ConnectorScheduling::Immediate {
                // Check all the message that are in the shared inbox
                while let Some(message) = scheduled.public.inbox.take_message() {
                    match message {
                        Message::Data(message) => {
                            // Check if we need to reroute, or can just put it
                            // in the private inbox of the connector
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
                            } else {
                                scheduled.connector.inbox.insert_message(message);
                            }
                        },
                        Message::Control(message) => {
                            match message.content {
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
                                    // Need to change port target
                                    let port = self.runtime.global_store.ports.get(&connector_key, port_id);
                                    port.peer_connector = new_target_connector_id;
                                    debug_assert!(delta_state.outbox.is_empty());

                                    // And respond with an Ack
                                    self.send_message_and_wake_up_if_sleeping(
                                        message.sender,
                                        Message::Control(ControlMessage{
                                            id: message.id,
                                            sender: connector_key.downcast(),
                                            content: ControlMessageVariant::Ack,
                                        })
                                    );
                                },
                                ControlMessageVariant::Ack => {
                                    scheduled.router.handle_ack(message.id);
                                }
                            }
                        },
                        Message::Ping => {},
                    }
                }

                // Actually run the connector
                // TODO: Revise
                let new_schedule;
                match &mut scheduled.connector {
                    ConnectorVariant::UserDefined(connector) => {
                        if connector.is_in_sync_mode() {
                            // In synchronous mode, so we can expect messages being sent,
                            // but we never expect the creation of connectors
                            new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state);
                            debug_assert!(delta_state.new_connectors.is_empty());
                        } else {
                            // In regular running mode (not in a sync block) we cannot send
                            // messages but we can create new connectors
                            new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state);
                            debug_assert!(delta_state.outbox.is_empty());
                        }
                    },
                    ConnectorVariant::Native(connector) => {
                        new_schedule = connector.run(&self.runtime.protocol_description);
                    },
                }

                // Handle all of the output from the current run: messages to
                // send and connectors to instantiate.
                self.handle_delta_state(&connector_key, &mut delta_state);

                cur_schedule = new_schedule;
            }

            // If here then the connector does not require immediate execution.
            // So enqueue it if requested, and otherwise put it in a sleeping
            // state.
            match cur_schedule {
                ConnectorScheduling::Immediate => unreachable!(),
                ConnectorScheduling::Later => {
                    // Simply queue it again later
                    self.runtime.global_store.connector_queue.push_back(connector_key);
                },
                ConnectorScheduling::NotNow => {
                    // Need to sleep, note that we are the only ones which are
                    // allows to set the sleeping state to `true`, and since
                    // we're running it must currently be `false`.
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
                    scheduled.public.sleeping.store(true, Ordering::Release);

                    // We might have received a message in the meantime from a
                    // thread that did not see the sleeping flag set to `true`,
                    // so:
                    if !scheduled.public.inbox.is_empty() {
                        let should_reschedule_self = scheduled.public.sleeping
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
                            .is_ok();

                        if should_reschedule_self {
                            self.runtime.global_store.connector_queue.push_back(connector_key);
                        }
                    }
                }
            }
        }
    }

    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
        // Handling any messages that were sent
        if !delta_state.outbox.is_empty() {
            for message in delta_state.outbox.drain(..) {
                let (inbox_message, target_connector_id) = {
                    let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port);
                    (
                        DataMessage {
                            sending_connector: connector_key.downcast(),
                            sending_port: sending_port.self_id,
                            receiving_port: sending_port.peer_id,
                            sender_prev_branch_id: message.sender_prev_branch_id,
                            sender_cur_branch_id: message.sender_cur_branch_id,
                            message: message.message,
                        },
                        sending_port.peer_connector,
                    )
                };

                self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
            }
        }

        // Handling any new connectors that were scheduled
        // TODO: Pool outgoing messages to reduce atomic access
        if !delta_state.new_connectors.is_empty() {
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);

            for new_connector in delta_state.new_connectors.drain(..) {
                // Add to global registry to obtain key
                let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector));
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);

                // Each port should be lost by the connector that created the
                // new one. Note that the creator is the current owner.
                for port_id in &new_connector.ports.owned_ports {
                    debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));

                    // Modify ownership, retrieve peer connector
                    let (peer_connector_id, peer_port_id) = {
                        let mut port = self.runtime.global_store.ports.get(connector_key, *port_id);
                        port.owning_connector = new_key.downcast();

                        (port.peer_connector, port.peer_id)
                    };

                    // Send message that port has changed ownership
                    let reroute_message = cur_connector.router.prepare_reroute(
                        port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
                    );

                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
                }

                // Schedule new connector to run
                self.runtime.global_store.connector_queue.push_back(new_key);
            }
        }
    }

    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);

        connector.inbox.insert_message(message);
        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            let key = unsafe { ConnectorKey::from_id(connector_id) };
            self.runtime.global_store.connector_queue.push_back(key);
        }
    }
}

/// Represents a rerouting entry due to a moved port
// TODO: Optimize
struct ReroutedTraffic {
    id: u32,                        // ID of control message
    port: PortIdLocal,              // targeted port
    source_connector: ConnectorId,  // connector we expect messages from
    target_connector: ConnectorId,  // connector they should be rerouted to
}

pub(crate) struct Router {
    id_counter: u32,
    active: Vec<ReroutedTraffic>,
}

impl Router {
    pub fn new() -> Self {
        Router{
            id_counter: 0,
            active: Vec::new(),
        }
    }

    /// Prepares rerouting messages due to changed ownership of a port. The
    /// control message returned by this function must be sent to the
    /// transferred port's peer connector.
    pub fn prepare_reroute(
        &mut self,
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
        new_owner_connector_id: ConnectorId
    ) -> Message {
        let id = self.id_counter;
        self.id_counter.overflowing_add(1);

        self.active.push(ReroutedTraffic{
            id,
            port: port_id,
            source_connector: peer_connector_id,
            target_connector: new_owner_connector_id,
        });

        return Message::Control(ControlMessage{
            id,
            sender: self_connector_id,
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
        });
    }

    /// Returns true if the supplied message should be rerouted. If so then this
    /// function returns the connector that should retrieve this message.
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
        for reroute in &self.active {
            if reroute.source_connector == message.sending_connector &&
                reroute.port == message.sending_port {
                // Need to reroute this message
                return Some(reroute.target_connector);
            }
        }

        return None;
    }

    /// Handles an Ack as an answer to a previously sent control message
    pub fn handle_ack(&mut self, id: u32) {
        let index = self.active.iter()
            .position(|v| v.id == id);

        match index {
            Some(index) => { self.active.remove(index); },
            None => { todo!("handling of nefarious ACKs"); },
        }
    }
}