Files @ 54917d00dfe6
Branch filter:

Location: CSY/reowolf/src/runtime2/connector2.rs - annotation

54917d00dfe6 16.5 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP on matching local solutions to find global solution
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
f4d1c8c04de6
f4d1c8c04de6
f4d1c8c04de6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
use std::collections::HashMap;
/// connector.rs
///
/// Represents a component. A component (and the scheduler that is running it)
/// has many properties that are not easy to subdivide into aspects that are
/// conceptually handled by particular data structures. That is to say: the code
/// that we run governs: running PDL code, keeping track of ports, instantiating
/// new components and transports (i.e. interacting with the runtime), running
/// a consensus algorithm, etc. But on the other hand, our data is rather
/// simple: we have a speculative execution tree, a set of ports that we own,
/// and a bit of code that we should run.
///
/// So currently the code is organized as following:
/// - The scheduler that is running the component is the authoritative source on
///     ports during *non-sync* mode. The consensus algorithm is the
///     authoritative source during *sync* mode. They retrieve each other's
///     state during the transitions. Hence port data exists duplicated between
///     these two datastructures.
/// - The execution tree is where executed branches reside. But the execution
///     tree is only aware of the tree shape itself (and keeps track of some
///     queues of branches that are in a particular state), and tends to store
///     the PDL program state. The consensus algorithm is also somewhat aware
///     of the execution tree, but only in terms of what is needed to complete
///     a sync round (for now, that means the port mapping in each branch).
///     Hence once more we have properties conceptually associated with branches
///     in two places.
/// - TODO: Write about handling messages, consensus wrapping data
/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx

use std::sync::atomic::AtomicBool;

use crate::PortId;
use crate::common::ComponentState;
use crate::protocol::eval::{Prompt, Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};
use crate::runtime2::consensus::find_ports_in_value_group;
use crate::runtime2::port::PortKind;

use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
use super::consensus::{Consensus, Consistency};
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
use super::inbox::PublicInbox;
use super::native::Connector;
use super::port::PortIdLocal;
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

#[derive(Eq, PartialEq)]
pub(crate) enum ConnectorScheduling {
    Immediate,      // Run again, immediately
    Later,          // Schedule for running, at some later point in time
    NotNow,         // Do not reschedule for running
    Exit,           // Connector has exited
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
}

struct ConnectorRunContext<'a> {
    branch_id: BranchId,
    consensus: &'a Consensus,
    received: &'a HashMap<PortIdLocal, ValueGroup>,
    scheduler: SchedulerCtx<'a>,
    prepared_channel: Option<(Value, Value)>,
}

impl RunContext for ConnectorRunContext{
    fn did_put(&mut self, port: PortId) -> bool {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.registered_id.is_some();
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        match self.received.get(&port_id) {
            Some(data) => Some(data.clone()),
            None => None,
        }
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.expected_firing.map(|v| Value::Bool(v));
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        return self.prepared_channel.take();
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        self.handle_new_messages(comp_ctx);
        if self.tree.is_in_sync() {
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
            self.consensus.handle_new_finished_sync_branches();
            return scheduling;
        } else {
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
            return scheduling;
        }
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                MessageFancy::Data(message) => handle_new_data_message(message, ctx),
                MessageFancy::Sync(message) => handle_new_sync_message(message, ctx),
                MessageFancy::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(ctx.workspace_branches.is_empty());
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches);

        for branch_id in ctx.workspace_branches.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
            self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        todo!("handle content of message?");
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: *sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let recv_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[recv_branch_id];
                            branch.insert_message(port_id, message.content.clone());

                            self.consensus.notify_of_new_branch(branch_id, recv_branch_id);
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content);
                            self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state == SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, content) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
                    comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{
                        sync_header, data_header, content
                    }));

                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
                    return ConnectorScheduling::Immediate;
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }

    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());

        let branch = self.tree.base_branch_mut();
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: *sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        match run_result {
            RunResult::ComponentTerminated => {
                branch.sync_state = SpeculativeState::Finished;

                return ConnectorScheduling::Exit;
            },
            RunResult::ComponentAtSyncStart => {
                let current_ports = comp_ctx.notify_sync_start();
                let sync_branch_id = self.tree.start_sync();
                self.consensus.start_sync(current_ports);
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
                // Note: we're relinquishing ownership of ports. But because
                // we are in non-sync mode the scheduler will handle and check
                // port ownership transfer.
                debug_assert!(comp_ctx.workspace_ports.is_empty());
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);

                let new_state = ComponentState {
                    prompt: Prompt::new(
                        &sched_ctx.runtime.protocol_description.types,
                        &sched_ctx.runtime.protocol_description.heap,
                        definition_id, monomorph_idx, arguments
                    ),
                };
                let new_component = ConnectorPDL::new(new_state);
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
                comp_ctx.workspace_ports.clear();

                return ConnectorScheduling::Later;
            },
            RunResult::NewChannel => {
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
                branch.prepared_channel = Some((
                    Value::Input(PortId::new(putter.self_id.index)),
                    Value::Output(PortId::new(getter.self_id.index)),
                ));

                comp_ctx.push_port(putter);
                comp_ctx.push_port(getter);

                return ConnectorScheduling::Immediate;
            },
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
        }
    }
}