Files @ 26d47db4f922
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

26d47db4f922 16.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP on second rewrite of port management
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::thread;

use crate::ProtocolDescription;
use crate::runtime2::global_store::ConnectorVariant;
use crate::runtime2::inbox::MessageContents;
use crate::runtime2::native::Connector;
use crate::runtime2::port::{Channel, PortKind, PortOwnership};

use super::RuntimeInner;
use super::port::{Port, PortIdLocal};
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};

/// Contains fields that are mostly managed by the scheduler, but may be
/// accessed by the connector
pub(crate) struct ConnectorCtx {
    pub(crate) id: ConnectorId,
    port_counter: Arc<AtomicU32>,
    pub(crate) ports: Vec<Port>,
}

impl ConnectorCtx {
    pub(crate) fn new(port_counter: Arc<AtomicU32>) -> ConnectorCtx {
        Self{
            id: ConnectorId::new_invalid(),
            port_counter,
            ports: initial_ports,
        }
    }

    /// Creates a (putter, getter) port pair belonging to the same channel. The
    /// port will be implicitly owned by the connector.
    pub(crate) fn create_channel(&mut self) -> Channel {
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
        let putter_id = PortIdLocal::new(getter_id + 1);
        let getter_id = PortIdLocal::new(getter_id);

        self.ports.push(Port{
            self_id: getter_id,
            peer_id: putter_id,
            kind: PortKind::Getter,
            peer_connector: self.id,
        });

        self.ports.push(Port{
            self_id: putter_id,
            peer_id: getter_id,
            kind: PortKind::Putter,
            peer_connector: self.id,
        });

        return Channel{ getter_id, putter_id };
    }

    pub(crate) fn add_port(&mut self, port: Port) {
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
        self.ports.push(port);
    }

    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
        let index = self.port_id_to_index(id);
        return self.ports.remove(index);
    }

    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
        let index = self.port_id_to_index(id);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
        let index = self.port_id_to_index(id);
        return &mut self.ports[index];
    }

    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
        for (idx, port) in self.ports.iter().enumerate() {
            if port.self_id == id {
                return idx;
            }
        }

        panic!("port {:?}, not owned by connector", id);
    }
}

pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
}

// Thinking aloud: actual ports should be accessible by connector, but managed
// by the scheduler (to handle rerouting messages). We could just give a read-
// only context, instead of an extra call on the "Connector" trait.

impl Scheduler {
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
        return Self{ runtime };
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        let mut delta_state = RunDeltaState::new();

        'thread_loop: loop {
            // Retrieve a unit of work
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
            if connector_key.is_none() {
                // TODO: @Performance, needs condition or something, and most
                //  def' not sleeping
                thread::sleep(Duration::new(1, 0));
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
                    // Thread exits!
                    break 'thread_loop;
                }

                continue 'thread_loop;
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);

            // Keep running until we should no longer immediately schedule the
            // connector.
            let mut cur_schedule = ConnectorScheduling::Immediate;
            while cur_schedule == ConnectorScheduling::Immediate {
                // Check all the message that are in the shared inbox
                while let Some(message) = scheduled.public.inbox.take_message() {
                    match message.contents {
                        MessageContents::Data(content) => {
                            // Check if we need to reroute, or can just put it
                            // in the private inbox of the connector
                            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) {
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content));
                            } else {
                                scheduled.connector.insert_data_message(content);
                            }
                        }
                        MessageContents::Sync(content) => {
                            scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state);
                        }
                        MessageContents::Solution(content) => {
                            // TODO: Handle solution message
                        },
                        MessageContents::Control(content) => {
                            match content.content {
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
                                    // Need to change port target
                                    let port = scheduled.context.get_port_mut(port_id);
                                    port.peer_connector = new_target_connector_id;
                                    debug_assert!(delta_state.outbox.is_empty());

                                    // And respond with an Ack
                                    // Note: after this code has been reached, we may not have any
                                    // messages in the outbox that send to the port whose owning
                                    // connector we just changed. This is because the `ack` will
                                    // clear the rerouting entry of the `ack`-receiver.
                                    self.send_message_and_wake_up_if_sleeping(
                                        content.sender,
                                        Message{
                                            sending_connector: connector_key.downcast(),
                                            receiving_port: PortIdLocal::new_invalid(),
                                            contents: MessageContents::Control(ControlMessage{
                                                id: content.id,
                                                content: ControlMessageVariant::Ack,
                                            }),
                                        }
                                    );
                                },
                                ControlMessageVariant::Ack => {
                                    scheduled.router.handle_ack(content.id);
                                }
                            }
                        }
                        Message::Ping => {},
                    }
                }

                // Actually run the connector
                let new_schedule = scheduled.connector.run(
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
                );

                // Handle all of the output from the current run: messages to
                // send and connectors to instantiate.
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);

                cur_schedule = new_schedule;
            }

            // If here then the connector does not require immediate execution.
            // So enqueue it if requested, and otherwise put it in a sleeping
            // state.
            match cur_schedule {
                ConnectorScheduling::Immediate => unreachable!(),
                ConnectorScheduling::Later => {
                    // Simply queue it again later
                    self.runtime.global_store.connector_queue.push_back(connector_key);
                },
                ConnectorScheduling::NotNow => {
                    // Need to sleep, note that we are the only ones which are
                    // allows to set the sleeping state to `true`, and since
                    // we're running it must currently be `false`.
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
                    scheduled.public.sleeping.store(true, Ordering::Release);

                    // We might have received a message in the meantime from a
                    // thread that did not see the sleeping flag set to `true`,
                    // so:
                    if !scheduled.public.inbox.is_empty() {
                        let should_reschedule_self = scheduled.public.sleeping
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
                            .is_ok();

                        if should_reschedule_self {
                            self.runtime.global_store.connector_queue.push_back(connector_key);
                        }
                    }
                }
            }
        }
    }

    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
        // Handling any messages that were sent
        let connector_id = connector_key.downcast();

        if !delta_state.outbox.is_empty() {
            for mut message in delta_state.outbox.drain(..) {
                // Based on the message contents, decide where the message
                // should be sent to. This might end up modifying the message.
                let (peer_connector, peer_port) = match &mut message {
                    MessageContents::Data(contents) => {
                        let port = context.get_port(contents.sending_port);
                        (port.peer_connector, port.peer_id)
                    },
                    MessageContents::Sync(contents) => {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid())
                    },
                    MessageContents::RequestCommit(contents)=> {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid())
                    },
                    MessageContents::ConfirmCommit(contents) => {
                        for to_visit in &contents.to_visit {
                            let message = Message{
                                sending_connector: connector_id,
                                receiving_port: PortIdLocal::new_invalid(),
                                contents: contents.clone(),
                            };
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
                        }
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
                    },
                    MessageContents::Control(_) | MessageContents::Ping => {
                        // Never generated by the user's code
                        unreachable!();
                    }
                };

                // TODO: Maybe clean this up, perhaps special case for
                //  ConfirmCommit can be handled differently.
                if peer_connector.is_valid() {
                    let message = Message {
                        sending_connector: connector_id,
                        receiving_port: peer_port,
                        contents: message,
                    };
                    self.send_message_and_wake_up_if_sleeping(peer_connector, message);
                }
            }
        }

        // Handling any new connectors that were scheduled
        // TODO: Pool outgoing messages to reduce atomic access
        if !delta_state.new_connectors.is_empty() {
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);

            for new_connector in delta_state.new_connectors.drain(..) {
                // Add to global registry to obtain key
                let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector));
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);

                // Call above changed ownership of ports, but we still have to
                // let the other end of the channel know that the port has
                // changed location.
                for port in &new_connector.context.ports {
                    let reroute_message = cur_connector.router.prepare_reroute(
                        port.self_id, port.peer_id, cur_connector.context.id,
                        port.peer_connector, new_connector.context.id
                    );

                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
                }

                // Schedule new connector to run
                self.runtime.global_store.connector_queue.push_back(new_key);
            }
        }
    }

    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);

        connector.inbox.insert_message(message);
        let should_wake_up = connector.sleeping
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
            .is_ok();

        if should_wake_up {
            let key = unsafe { ConnectorKey::from_id(connector_id) };
            self.runtime.global_store.connector_queue.push_back(key);
        }
    }
}

/// Represents a rerouting entry due to a moved port
// TODO: Optimize
struct ReroutedTraffic {
    id: u32,                        // ID of control message
    port: PortIdLocal,              // targeted port
    source_connector: ConnectorId,  // connector we expect messages from
    target_connector: ConnectorId,  // connector they should be rerouted to
}

pub(crate) struct Router {
    id_counter: u32,
    active: Vec<ReroutedTraffic>,
}

impl Router {
    pub fn new() -> Self {
        Router{
            id_counter: 0,
            active: Vec::new(),
        }
    }

    /// Prepares rerouting messages due to changed ownership of a port. The
    /// control message returned by this function must be sent to the
    /// transferred port's peer connector.
    pub fn prepare_reroute(
        &mut self,
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
        new_owner_connector_id: ConnectorId
    ) -> Message {
        let id = self.id_counter;
        self.id_counter.overflowing_add(1);

        self.active.push(ReroutedTraffic{
            id,
            port: port_id,
            source_connector: peer_connector_id,
            target_connector: new_owner_connector_id,
        });

        return Message::Control(ControlMessage{
            id,
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
        });
    }

    /// Returns true if the supplied message should be rerouted. If so then this
    /// function returns the connector that should retrieve this message.
    pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option<ConnectorId> {
        for reroute in &self.active {
            if reroute.source_connector == sending_connector &&
                reroute.port == sending_port {
                // Need to reroute this message
                return Some(reroute.target_connector);
            }
        }

        return None;
    }

    /// Handles an Ack as an answer to a previously sent control message
    pub fn handle_ack(&mut self, id: u32) {
        let index = self.active.iter()
            .position(|v| v.id == id);

        match index {
            Some(index) => { self.active.remove(index); },
            None => { todo!("handling of nefarious ACKs"); },
        }
    }
}