Files @ ecc47971d535
Branch filter:

Location: CSY/reowolf/src/runtime2/connector2.rs

ecc47971d535 16.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on handling sync solution messages
use std::collections::HashMap;
/// connector.rs
///
/// Represents a component. A component (and the scheduler that is running it)
/// has many properties that are not easy to subdivide into aspects that are
/// conceptually handled by particular data structures. That is to say: the code
/// that we run governs: running PDL code, keeping track of ports, instantiating
/// new components and transports (i.e. interacting with the runtime), running
/// a consensus algorithm, etc. But on the other hand, our data is rather
/// simple: we have a speculative execution tree, a set of ports that we own,
/// and a bit of code that we should run.
///
/// So currently the code is organized as following:
/// - The scheduler that is running the component is the authoritative source on
///     ports during *non-sync* mode. The consensus algorithm is the
///     authoritative source during *sync* mode. They retrieve each other's
///     state during the transitions. Hence port data exists duplicated between
///     these two datastructures.
/// - The execution tree is where executed branches reside. But the execution
///     tree is only aware of the tree shape itself (and keeps track of some
///     queues of branches that are in a particular state), and tends to store
///     the PDL program state. The consensus algorithm is also somewhat aware
///     of the execution tree, but only in terms of what is needed to complete
///     a sync round (for now, that means the port mapping in each branch).
///     Hence once more we have properties conceptually associated with branches
///     in two places.
/// - TODO: Write about handling messages, consensus wrapping data
/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx

use std::sync::atomic::AtomicBool;

use crate::PortId;
use crate::common::ComponentState;
use crate::protocol::eval::{Prompt, Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};
use crate::runtime2::consensus::find_ports_in_value_group;
use crate::runtime2::port::PortKind;

use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
use super::consensus::{Consensus, Consistency};
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
use super::inbox::PublicInbox;
use super::native::Connector;
use super::port::PortIdLocal;
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

#[derive(Eq, PartialEq)]
pub(crate) enum ConnectorScheduling {
    Immediate,      // Run again, immediately
    Later,          // Schedule for running, at some later point in time
    NotNow,         // Do not reschedule for running
    Exit,           // Connector has exited
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
}

struct ConnectorRunContext<'a> {
    branch_id: BranchId,
    consensus: &'a Consensus,
    received: &'a HashMap<PortIdLocal, ValueGroup>,
    scheduler: SchedulerCtx<'a>,
    prepared_channel: Option<(Value, Value)>,
}

impl RunContext for ConnectorRunContext{
    fn did_put(&mut self, port: PortId) -> bool {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.registered_id.is_some();
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        match self.received.get(&port_id) {
            Some(data) => Some(data.clone()),
            None => None,
        }
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.expected_firing.map(|v| Value::Bool(v));
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        return self.prepared_channel.take();
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        self.handle_new_messages(comp_ctx);
        if self.tree.is_in_sync() {
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
            self.consensus.handle_new_finished_sync_branches();
            return scheduling;
        } else {
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
            return scheduling;
        }
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                MessageFancy::Data(message) => handle_new_data_message(message, ctx),
                MessageFancy::Sync(message) => handle_new_sync_message(message, ctx),
                MessageFancy::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(ctx.workspace_branches.is_empty());
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches);

        for branch_id in ctx.workspace_branches.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
            self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        self.consensus.handle_received_sync_message(message, ctx);
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: *sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let recv_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[recv_branch_id];
                            branch.insert_message(port_id, message.content.clone());

                            self.consensus.notify_of_new_branch(branch_id, recv_branch_id);
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content);
                            self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state == SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, content) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
                    comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{
                        sync_header, data_header, content
                    }));

                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
                    return ConnectorScheduling::Immediate;
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }

    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());

        let branch = self.tree.base_branch_mut();
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: *sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        match run_result {
            RunResult::ComponentTerminated => {
                branch.sync_state = SpeculativeState::Finished;

                return ConnectorScheduling::Exit;
            },
            RunResult::ComponentAtSyncStart => {
                let current_ports = comp_ctx.notify_sync_start();
                let sync_branch_id = self.tree.start_sync();
                self.consensus.start_sync(current_ports);
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
                // Note: we're relinquishing ownership of ports. But because
                // we are in non-sync mode the scheduler will handle and check
                // port ownership transfer.
                debug_assert!(comp_ctx.workspace_ports.is_empty());
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);

                let new_state = ComponentState {
                    prompt: Prompt::new(
                        &sched_ctx.runtime.protocol_description.types,
                        &sched_ctx.runtime.protocol_description.heap,
                        definition_id, monomorph_idx, arguments
                    ),
                };
                let new_component = ConnectorPDL::new(new_state);
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
                comp_ctx.workspace_ports.clear();

                return ConnectorScheduling::Later;
            },
            RunResult::NewChannel => {
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
                branch.prepared_channel = Some((
                    Value::Input(PortId::new(putter.self_id.index)),
                    Value::Output(PortId::new(getter.self_id.index)),
                ));

                comp_ctx.push_port(putter);
                comp_ctx.push_port(getter);

                return ConnectorScheduling::Immediate;
            },
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
        }
    }
}