Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation

58dfabd1be9f 13.1 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
moving to laptop
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
f450ae18ef58
f450ae18ef58
daf15df0f8ca
f450ae18ef58
58dfabd1be9f
1aef293674a6
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
1aef293674a6
daf15df0f8ca
58dfabd1be9f
1aef293674a6
1aef293674a6
f450ae18ef58
58dfabd1be9f
58dfabd1be9f
f450ae18ef58
f450ae18ef58
f450ae18ef58
f450ae18ef58
f450ae18ef58
cf26538b25dc
f450ae18ef58
daf15df0f8ca
f450ae18ef58
58dfabd1be9f
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
f450ae18ef58
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
f450ae18ef58
f450ae18ef58
f450ae18ef58
f450ae18ef58
58dfabd1be9f
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
f450ae18ef58
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
cf26538b25dc
f450ae18ef58
f450ae18ef58
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
1aef293674a6
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
f450ae18ef58
f450ae18ef58
1aef293674a6
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
1aef293674a6
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::thread;

use crate::ProtocolDescription;
use crate::runtime2::global_store::ConnectorVariant;

use super::RuntimeInner;
use super::port::{PortIdLocal};
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};

pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
}

impl Scheduler {
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
        return Self{ runtime };
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        let mut delta_state = RunDeltaState::new();

        'thread_loop: loop {
            // Retrieve a unit of work
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
            if connector_key.is_none() {
                // TODO: @Performance, needs condition or something, and most
                //  def' not sleeping
                thread::sleep(Duration::new(1, 0));
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
                    // Thread exits!
                    break 'thread_loop;
                }

                continue 'thread_loop;
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);

            // Keep running until we should no longer immediately schedule the
            // connector.
            let mut cur_schedule = ConnectorScheduling::Immediate;
            while cur_schedule == ConnectorScheduling::Immediate {
                // Check all the message that are in the shared inbox
                while let Some(message) = scheduled.public.inbox.take_message() {
                    match message {
                        Message::Data(message) => {
                            // Check if we need to reroute, or can just put it
                            // in the private inbox of the connector
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
                            } else {
                                scheduled.connector.inbox.insert_message(message);
                            }
                        },
                        Message::Control(message) => {
                            match message.content {
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
                                    // Need to change port target
                                    let port = self.runtime.global_store.ports.get(&connector_key, port_id);
                                    port.peer_connector = new_target_connector_id;
                                    debug_assert!(delta_state.outbox.is_empty());

                                    // And respond with an Ack
                                    self.send_message_and_wake_up_if_sleeping(
                                        message.sender,
                                        Message::Control(ControlMessage{
                                            id: message.id,
                                            sender: connector_key.downcast(),
                                            content: ControlMessageVariant::Ack,
                                        })
                                    );
                                },
                                ControlMessageVariant::Ack => {
                                    scheduled.router.handle_ack(message.id);
                                }
                            }
                        },
                        Message::Ping => {},
                    }
                }

                // Actually run the connector
                // TODO: Revise
                let new_schedule;
                match &mut scheduled.connector {
                    ConnectorVariant::UserDefined(connector) => {
                        if connector.is_in_sync_mode() {
                            // In synchronous mode, so we can expect messages being sent,
                            // but we never expect the creation of connectors
                            new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state);
                            debug_assert!(delta_state.new_connectors.is_empty());
                        } else {
                            // In regular running mode (not in a sync block) we cannot send
                            // messages but we can create new connectors
                            new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state);
                            debug_assert!(delta_state.outbox.is_empty());
                        }
                    },
                    ConnectorVariant::Native(connector) => {
                        new_schedule = connector.run(&self.runtime.protocol_description);
                    },
                }

                // Handle all of the output from the current run: messages to
                // send and connectors to instantiate.
                self.handle_delta_state(&connector_key, &mut delta_state);

                cur_schedule = new_schedule;
            }

            // If here then the connector does not require immediate execution.
            // So enqueue it if requested, and otherwise put it in a sleeping
            // state.
            match cur_schedule {
                ConnectorScheduling::Immediate => unreachable!(),
                ConnectorScheduling::Later => {
                    // Simply queue it again later
                    self.runtime.global_store.connector_queue.push_back(connector_key);
                },
                ConnectorScheduling::NotNow => {
                    // Need to sleep, note that we are the only ones which are
                    // allows to set the sleeping state to `true`, and since
                    // we're running it must currently be `false`.
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
                    scheduled.public.sleeping.store(true, Ordering::Release);

                    // We might have received a message in the meantime from a
                    // thread that did not see the sleeping flag set to `true`,
                    // so:
                    if !scheduled.public.inbox.is_empty() {
                        let should_reschedule_self = scheduled.public.sleeping
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
                            .is_ok();

                        if should_reschedule_self {
                            self.runtime.global_store.connector_queue.push_back(connector_key);
                        }
                    }
                }
            }
        }
    }

    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
        // Handling any messages that were sent
        if !delta_state.outbox.is_empty() {
            for message in delta_state.outbox.drain(..) {
                let (inbox_message, target_connector_id) = {
                    let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port);
                    (
                        DataMessage {
                            sending_connector: connector_key.downcast(),
                            sending_port: sending_port.self_id,
                            receiving_port: sending_port.peer_id,
                            sender_prev_branch_id: message.sender_prev_branch_id,
                            sender_cur_branch_id: message.sender_cur_branch_id,
                            message: message.message,
                        },
                        sending_port.peer_connector,
                    )
                };

                self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
            }
        }

        // Handling any new connectors that were scheduled
        // TODO: Pool outgoing messages to reduce atomic access
        if !delta_state.new_connectors.is_empty() {
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);

            for new_connector in delta_state.new_connectors.drain(..) {
                // Add to global registry to obtain key
                let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector));
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);

                // Each port should be lost by the connector that created the
                // new one. Note that the creator is the current owner.
                for port_id in &new_connector.ports.owned_ports {
                    debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));

                    // Modify ownership, retrieve peer connector
                    let (peer_connector_id, peer_port_id) = {
                        let mut port = self.runtime.global_store.ports.get(connector_key, *port_id);
                        port.owning_connector = new_key.downcast();

                        (port.peer_connector, port.peer_id)
                    };

                    // Send message that port has changed ownership
                    let reroute_message = cur_connector.router.prepare_reroute(
                        port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
                    );

                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
                }

                // Schedule new connector to run
                self.runtime.global_store.connector_queue.push_back(new_key);
            }
        }
    }

    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);

        connector.inbox.insert_message(message);
        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            let key = unsafe { ConnectorKey::from_id(connector_id) };
            self.runtime.global_store.connector_queue.push_back(key);
        }
    }
}

/// Represents a rerouting entry due to a moved port
// TODO: Optimize
struct ReroutedTraffic {
    id: u32,                        // ID of control message
    port: PortIdLocal,              // targeted port
    source_connector: ConnectorId,  // connector we expect messages from
    target_connector: ConnectorId,  // connector they should be rerouted to
}

pub(crate) struct Router {
    id_counter: u32,
    active: Vec<ReroutedTraffic>,
}

impl Router {
    pub fn new() -> Self {
        Router{
            id_counter: 0,
            active: Vec::new(),
        }
    }

    /// Prepares rerouting messages due to changed ownership of a port. The
    /// control message returned by this function must be sent to the
    /// transferred port's peer connector.
    pub fn prepare_reroute(
        &mut self,
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
        new_owner_connector_id: ConnectorId
    ) -> Message {
        let id = self.id_counter;
        self.id_counter.overflowing_add(1);

        self.active.push(ReroutedTraffic{
            id,
            port: port_id,
            source_connector: peer_connector_id,
            target_connector: new_owner_connector_id,
        });

        return Message::Control(ControlMessage{
            id,
            sender: self_connector_id,
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
        });
    }

    /// Returns true if the supplied message should be rerouted. If so then this
    /// function returns the connector that should retrieve this message.
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
        for reroute in &self.active {
            if reroute.source_connector == message.sending_connector &&
                reroute.port == message.sending_port {
                // Need to reroute this message
                return Some(reroute.target_connector);
            }
        }

        return None;
    }

    /// Handles an Ack as an answer to a previously sent control message
    pub fn handle_ack(&mut self, id: u32) {
        let index = self.active.iter()
            .position(|v| v.id == id);

        match index {
            Some(index) => { self.active.remove(index); },
            None => { todo!("handling of nefarious ACKs"); },
        }
    }
}