Files @ 98aadfccbafd
Branch filter:

Location: CSY/reowolf/src/runtime2/scheduler.rs

98aadfccbafd 16.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
solving problem of connectors shutting down
use std::sync::Arc;
use std::sync::atomic::Ordering;

use super::{RuntimeInner, ConnectorId, ConnectorKey};
use super::port::{Port, PortIdLocal};
use super::native::Connector;
use super::connector::{ConnectorScheduling, RunDeltaState};
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};

/// Contains fields that are mostly managed by the scheduler, but may be
/// accessed by the connector
pub(crate) struct ConnectorCtx {
    pub(crate) id: ConnectorId,
    pub(crate) ports: Vec<Port>,
}

impl ConnectorCtx {
    pub(crate) fn new() -> ConnectorCtx {
        Self{
            id: ConnectorId::new_invalid(),
            ports: Vec::new(),
        }
    }

    pub(crate) fn add_port(&mut self, port: Port) {
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
        self.ports.push(port);
    }

    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
        let index = self.port_id_to_index(id);
        return self.ports.remove(index);
    }

    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
        let index = self.port_id_to_index(id);
        return &self.ports[index];
    }

    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
        let index = self.port_id_to_index(id);
        return &mut self.ports[index];
    }

    fn port_id_to_index(&self, id: PortIdLocal) -> usize {
        for (idx, port) in self.ports.iter().enumerate() {
            if port.self_id == id {
                return idx;
            }
        }

        panic!("port {:?}, not owned by connector", id);
    }
}

// Because it contains pointers we're going to do a copy by value on this one
#[derive(Clone, Copy)]
pub(crate) struct SchedulerCtx<'a> {
    pub(crate) runtime: &'a RuntimeInner
}

pub(crate) struct Scheduler {
    runtime: Arc<RuntimeInner>,
    scheduler_id: u32,
}

impl Scheduler {
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
        return Self{ runtime, scheduler_id };
    }

    pub fn run(&mut self) {
        // Setup global storage and workspaces that are reused for every
        // connector that we run
        let scheduler_id = self.scheduler_id;
        let mut delta_state = RunDeltaState::new();

        'thread_loop: loop {
            // Retrieve a unit of work
            println!("DEBUG [{}]: Waiting for work", scheduler_id);
            let connector_key = self.runtime.wait_for_work();
            if connector_key.is_none() {
                // We should exit
                println!("DEBUG [{}]: ... No more work, quitting", scheduler_id);
                break 'thread_loop;
            }

            // We have something to do
            let connector_key = connector_key.unwrap();
            println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index);

            let scheduled = self.runtime.get_component_private(&connector_key);

            // Keep running until we should no longer immediately schedule the
            // connector.
            let mut cur_schedule = ConnectorScheduling::Immediate;
            while cur_schedule == ConnectorScheduling::Immediate {
                // Check all the message that are in the shared inbox
                while let Some(message) = scheduled.public.inbox.take_message() {
                    // Check for rerouting
                    println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message);
                    if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
                        self.send_message_and_wake_up_if_sleeping(other_connector_id, message);
                        continue;
                    }

                    // Check for messages that requires special action from the
                    // scheduler.
                    if let MessageContents::Control(content) = message.contents {
                        match content.content {
                            ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
                                // Need to change port target
                                let port = scheduled.context.get_port_mut(port_id);
                                port.peer_connector = new_target_connector_id;
                                debug_assert!(delta_state.outbox.is_empty());

                                // And respond with an Ack
                                // Note: after this code has been reached, we may not have any
                                // messages in the outbox that send to the port whose owning
                                // connector we just changed. This is because the `ack` will
                                // clear the rerouting entry of the `ack`-receiver.
                                self.send_message_and_wake_up_if_sleeping(
                                    message.sending_connector,
                                    Message{
                                        sending_connector: connector_key.downcast(),
                                        receiving_port: PortIdLocal::new_invalid(),
                                        contents: MessageContents::Control(ControlMessage{
                                            id: content.id,
                                            content: ControlMessageVariant::Ack,
                                        }),
                                    }
                                );
                            },
                            ControlMessageVariant::Ack => {
                                scheduled.router.handle_ack(content.id);
                            }
                        }
                    } else {
                        // Let connector handle message
                        scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state);
                    }
                }

                // Actually run the connector
                println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
                let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
                let new_schedule = scheduled.connector.run(
                    scheduler_ctx, &scheduled.context, &mut delta_state
                );
                println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);

                // Handle all of the output from the current run: messages to
                // send and connectors to instantiate.
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);

                cur_schedule = new_schedule;
            }

            // If here then the connector does not require immediate execution.
            // So enqueue it if requested, and otherwise put it in a sleeping
            // state.
            match cur_schedule {
                ConnectorScheduling::Immediate => unreachable!(),
                ConnectorScheduling::Later => {
                    // Simply queue it again later
                    self.runtime.push_work(connector_key);
                },
                ConnectorScheduling::NotNow => {
                    // Need to sleep, note that we are the only ones which are
                    // allows to set the sleeping state to `true`, and since
                    // we're running it must currently be `false`.
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
                    scheduled.public.sleeping.store(true, Ordering::Release);

                    // We might have received a message in the meantime from a
                    // thread that did not see the sleeping flag set to `true`,
                    // so:
                    if !scheduled.public.inbox.is_empty() {
                        let should_reschedule_self = scheduled.public.sleeping
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
                            .is_ok();

                        if should_reschedule_self {
                            self.runtime.push_work(connector_key);
                        }
                    }
                },
                ConnectorScheduling::Exit => {
                    // Prepare for exit. Set the shutdown flag and broadcast
                    // messages to notify peers of closing channels
                    scheduled.shutting_down = true;
                    for port in &scheduled.context.ports {
                        self.runtime.send_message(port.peer_connector, Message{
                            sending_connector: connector_key.downcast(),
                            receiving_port: port.peer_id,
                            contents: MessageContents::Control(ControlMessage{
                                id: 0,
                                content: ControlMessageVariant::Ack
                            })
                        })
                    }
                }
            }
        }
    }

    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) {
        // Handling any messages that were sent
        let connector_id = connector_key.downcast();

        if !delta_state.outbox.is_empty() {
            for mut message in delta_state.outbox.drain(..) {
                // Based on the message contents, decide where the message
                // should be sent to. This might end up modifying the message.
                let (peer_connector, peer_port) = match &mut message {
                    MessageContents::Data(contents) => {
                        let port = context.get_port(contents.sending_port);
                        (port.peer_connector, port.peer_id)
                    },
                    MessageContents::Sync(contents) => {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid())
                    },
                    MessageContents::RequestCommit(contents)=> {
                        let connector = contents.to_visit.pop().unwrap();
                        (connector, PortIdLocal::new_invalid())
                    },
                    MessageContents::ConfirmCommit(contents) => {
                        for to_visit in &contents.to_visit {
                            let message = Message{
                                sending_connector: connector_id,
                                receiving_port: PortIdLocal::new_invalid(),
                                contents: MessageContents::ConfirmCommit(contents.clone()),
                            };
                            self.runtime.send_message(*to_visit, message);
                        }
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
                    },
                    MessageContents::Control(_) | MessageContents::Ping => {
                        // Never generated by the user's code
                        unreachable!();
                    }
                };

                // TODO: Maybe clean this up, perhaps special case for
                //  ConfirmCommit can be handled differently.
                if peer_connector.is_valid() {
                    let message = Message {
                        sending_connector: connector_id,
                        receiving_port: peer_port,
                        contents: message,
                    };
                    self.runtime.send_message(peer_connector, message);
                }
            }
        }

        if !delta_state.new_ports.is_empty() {
            for port in delta_state.new_ports.drain(..) {
                context.ports.push(port);
            }
        }

        // Handling any new connectors that were scheduled
        // TODO: Pool outgoing messages to reduce atomic access
        if !delta_state.new_connectors.is_empty() {
            let cur_connector = self.runtime.get_component_private(connector_key);

            for new_connector in delta_state.new_connectors.drain(..) {
                // Add to global registry to obtain key
                let new_key = self.runtime.create_pdl_component(cur_connector, new_connector);
                let new_connector = self.runtime.get_component_private(&new_key);

                // Call above changed ownership of ports, but we still have to
                // let the other end of the channel know that the port has
                // changed location.
                for port in &new_connector.context.ports {
                    cur_connector.pending_acks += 1;
                    let reroute_message = cur_connector.router.prepare_reroute(
                        port.self_id, port.peer_id, cur_connector.context.id,
                        port.peer_connector, new_connector.context.id
                    );

                    self.runtime.send_message(port.peer_connector, reroute_message);
                }

                // Schedule new connector to run
                self.runtime.push_work(new_key);
            }
        }

        debug_assert!(delta_state.outbox.is_empty());
        debug_assert!(delta_state.new_ports.is_empty());
        debug_assert!(delta_state.new_connectors.is_empty());
    }
}

// -----------------------------------------------------------------------------
// Control messages
// -----------------------------------------------------------------------------

struct ControlEntry {
    id: u32,
    variant: ControlVariant,
}

enum ControlVariant {
    ChangedPort(ControlChangedPort),
    ClosedChannel(ControlClosedChannel),
}

struct ControlChangedPort {
    target_port: PortIdLocal,       // if send to this port, then reroute
    source_connector: ConnectorId,  // connector we expect messages from
    target_connector: ConnectorId,  // connector we need to reroute to
}

struct ControlClosedChannel {

}

pub(crate) struct ControlMessageHandler {
    id_counter: u32,
    active: Vec<ControlEntry>,
}

impl ControlMessageHandler {
    pub fn new() -> Self {
        ControlMessageHandler {
            id_counter: 0,
            active: Vec::new(),
        }
    }

    /// Prepares rerouting messages due to changed ownership of a port. The
    /// control message returned by this function must be sent to the
    /// transferred port's peer connector.
    pub fn prepare_reroute(
        &mut self,
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
        new_owner_connector_id: ConnectorId
    ) -> Message {
        let id = self.id_counter;
        let (new_id_counter, _) = self.id_counter.overflowing_add(1);
        self.id_counter = new_id_counter;

        self.active.push(ReroutedTraffic{
            id,
            target_port: port_id,
            source_connector: peer_connector_id,
            target_connector: new_owner_connector_id,
        });

        return Message{
            sending_connector: self_connector_id,
            receiving_port: peer_port_id,
            contents: MessageContents::Control(ControlMessage{
                id,
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
            })
        };
    }

    /// Returns true if the supplied message should be rerouted. If so then this
    /// function returns the connector that should retrieve this message.
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
        for reroute in &self.active {
            if reroute.source_connector == sending_connector &&
                reroute.target_port == target_port {
                // Need to reroute this message
                return Some(reroute.target_connector);
            }
        }

        return None;
    }

    /// Handles an Ack as an answer to a previously sent control message
    pub fn handle_ack(&mut self, id: u32) {
        let index = self.active.iter()
            .position(|v| v.id == id);

        match index {
            Some(index) => { self.active.remove(index); },
            None => { todo!("handling of nefarious ACKs"); },
        }
    }
}