Files @ 68411f4b8014
Branch filter:

Location: CSY/reowolf/src/runtime2/connector.rs

68411f4b8014 17.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Round of cleanup on temporary type names and old code
// connector.rs
//
// Represents a component. A component (and the scheduler that is running it)
// has many properties that are not easy to subdivide into aspects that are
// conceptually handled by particular data structures. That is to say: the code
// that we run governs: running PDL code, keeping track of ports, instantiating
// new components and transports (i.e. interacting with the runtime), running
// a consensus algorithm, etc. But on the other hand, our data is rather
// simple: we have a speculative execution tree, a set of ports that we own,
// and a bit of code that we should run.
//
// So currently the code is organized as following:
// - The scheduler that is running the component is the authoritative source on
//     ports during *non-sync* mode. The consensus algorithm is the
//     authoritative source during *sync* mode. They retrieve each other's
//     state during the transitions. Hence port data exists duplicated between
//     these two datastructures.
// - The execution tree is where executed branches reside. But the execution
//     tree is only aware of the tree shape itself (and keeps track of some
//     queues of branches that are in a particular state), and tends to store
//     the PDL program state. The consensus algorithm is also somewhat aware
//     of the execution tree, but only in terms of what is needed to complete
//     a sync round (for now, that means the port mapping in each branch).
//     Hence once more we have properties conceptually associated with branches
//     in two places.
// - TODO: Write about handling messages, consensus wrapping data
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx

use std::collections::HashMap;
use std::sync::atomic::AtomicBool;

use crate::PortId;
use crate::common::ComponentState;
use crate::protocol::eval::{Prompt, Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};

use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
use super::native::Connector;
use super::port::{PortKind, PortIdLocal};
use super::scheduler::{ComponentCtx, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

#[derive(Eq, PartialEq)]
pub(crate) enum ConnectorScheduling {
    Immediate,      // Run again, immediately
    Later,          // Schedule for running, at some later point in time
    NotNow,         // Do not reschedule for running
    Exit,           // Connector has exited
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
}

struct ConnectorRunContext<'a> {
    branch_id: BranchId,
    consensus: &'a Consensus,
    received: &'a HashMap<PortIdLocal, ValueGroup>,
    scheduler: SchedulerCtx<'a>,
    prepared_channel: Option<(Value, Value)>,
}

impl<'a> RunContext for ConnectorRunContext<'a>{
    fn did_put(&mut self, port: PortId) -> bool {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.registered_id.is_some();
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        match self.received.get(&port_id) {
            Some(data) => Some(data.clone()),
            None => None,
        }
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.expected_firing.map(|v| Value::Bool(v));
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        return self.prepared_channel.take();
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        self.handle_new_messages(comp_ctx);
        if self.tree.is_in_sync() {
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
                return ConnectorScheduling::Immediate;
            } else {
                return scheduling
            }
        } else {
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
            return scheduling;
        }
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                Message::Data(message) => self.handle_new_data_message(message, ctx),
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
                Message::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(ctx.workspace_branches.is_empty());
        let mut branches = Vec::new(); // TODO: @Remove
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);

        for branch_id in branches.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
        }
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[receiving_branch_id];

                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());

                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, content) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
                    comp_ctx.submit_message(Message::Data(DataMessage {
                        sync_header, data_header,
                        content: DataContent::Message(content),
                    }));

                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
                    return ConnectorScheduling::Immediate;
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }

    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());

        let branch = self.tree.base_branch_mut();
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);

        let mut run_context = ConnectorRunContext{
            branch_id: branch.id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        match run_result {
            RunResult::ComponentTerminated => {
                branch.sync_state = SpeculativeState::Finished;

                return ConnectorScheduling::Exit;
            },
            RunResult::ComponentAtSyncStart => {
                comp_ctx.notify_sync_start();
                let sync_branch_id = self.tree.start_sync();
                self.consensus.start_sync(comp_ctx);
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
                // Note: we're relinquishing ownership of ports. But because
                // we are in non-sync mode the scheduler will handle and check
                // port ownership transfer.
                debug_assert!(comp_ctx.workspace_ports.is_empty());
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);

                let new_state = ComponentState {
                    prompt: Prompt::new(
                        &sched_ctx.runtime.protocol_description.types,
                        &sched_ctx.runtime.protocol_description.heap,
                        definition_id, monomorph_idx, arguments
                    ),
                };
                let new_component = ConnectorPDL::new(new_state);
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
                comp_ctx.workspace_ports.clear();

                return ConnectorScheduling::Later;
            },
            RunResult::NewChannel => {
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
                branch.prepared_channel = Some((
                    Value::Input(PortId::new(putter.self_id.index)),
                    Value::Output(PortId::new(getter.self_id.index)),
                ));

                comp_ctx.push_port(putter);
                comp_ctx.push_port(getter);

                return ConnectorScheduling::Immediate;
            },
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
        }
    }

    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
        let mut fake_vec = Vec::new();
        self.tree.end_sync(solution_branch_id);
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);

        for port in fake_vec {
            // TODO: Handle sent/received ports
            debug_assert!(ctx.get_port_by_id(port).is_some());
        }

        ctx.notify_sync_end(&[]);
    }
}