Files @ 32d91577e090
Branch filter:

Location: CSY/reowolf/src/runtime2/connector2.rs - annotation

32d91577e090 17.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
initial multithreaded runtime
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
a2b6b8e94778
32d91577e090
a2b6b8e94778
a2b6b8e94778
328f04b6612f
a2b6b8e94778
edb4c4be7e45
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
edb4c4be7e45
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
edb4c4be7e45
32d91577e090
32d91577e090
32d91577e090
32d91577e090
edb4c4be7e45
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
f4d1c8c04de6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
edb4c4be7e45
edb4c4be7e45
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
328f04b6612f
328f04b6612f
ce98be9707a6
328f04b6612f
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
32d91577e090
32d91577e090
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
edb4c4be7e45
32d91577e090
edb4c4be7e45
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
bc29d573b2db
bc29d573b2db
bc29d573b2db
bc29d573b2db
328f04b6612f
bc29d573b2db
bc29d573b2db
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
32d91577e090
ce98be9707a6
ce98be9707a6
32d91577e090
32d91577e090
32d91577e090
32d91577e090
ce98be9707a6
32d91577e090
32d91577e090
32d91577e090
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
32d91577e090
ce98be9707a6
ce98be9707a6
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
a2b6b8e94778
32d91577e090
32d91577e090
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
ce98be9707a6
a2b6b8e94778
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
bc29d573b2db
edb4c4be7e45
bc29d573b2db
bc29d573b2db
328f04b6612f
bc29d573b2db
bc29d573b2db
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
328f04b6612f
a2b6b8e94778
328f04b6612f
32d91577e090
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
f4d1c8c04de6
f4d1c8c04de6
f4d1c8c04de6
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
a2b6b8e94778
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
edb4c4be7e45
ce98be9707a6
use std::collections::HashMap;
/// connector.rs
///
/// Represents a component. A component (and the scheduler that is running it)
/// has many properties that are not easy to subdivide into aspects that are
/// conceptually handled by particular data structures. That is to say: the code
/// that we run governs: running PDL code, keeping track of ports, instantiating
/// new components and transports (i.e. interacting with the runtime), running
/// a consensus algorithm, etc. But on the other hand, our data is rather
/// simple: we have a speculative execution tree, a set of ports that we own,
/// and a bit of code that we should run.
///
/// So currently the code is organized as following:
/// - The scheduler that is running the component is the authoritative source on
///     ports during *non-sync* mode. The consensus algorithm is the
///     authoritative source during *sync* mode. They retrieve each other's
///     state during the transitions. Hence port data exists duplicated between
///     these two datastructures.
/// - The execution tree is where executed branches reside. But the execution
///     tree is only aware of the tree shape itself (and keeps track of some
///     queues of branches that are in a particular state), and tends to store
///     the PDL program state. The consensus algorithm is also somewhat aware
///     of the execution tree, but only in terms of what is needed to complete
///     a sync round (for now, that means the port mapping in each branch).
///     Hence once more we have properties conceptually associated with branches
///     in two places.
/// - TODO: Write about handling messages, consensus wrapping data
/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx

use std::sync::atomic::AtomicBool;

use crate::PortId;
use crate::common::ComponentState;
use crate::protocol::eval::{Prompt, Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};
use crate::runtime2::consensus::find_ports_in_value_group;
use crate::runtime2::inbox2::DataContent;
use crate::runtime2::port::PortKind;

use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
use super::consensus::{Consensus, Consistency};
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox};
use super::native::Connector;
use super::port::PortIdLocal;
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

#[derive(Eq, PartialEq)]
pub(crate) enum ConnectorScheduling {
    Immediate,      // Run again, immediately
    Later,          // Schedule for running, at some later point in time
    NotNow,         // Do not reschedule for running
    Exit,           // Connector has exited
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
}

struct ConnectorRunContext<'a> {
    branch_id: BranchId,
    consensus: &'a Consensus,
    received: &'a HashMap<PortIdLocal, ValueGroup>,
    scheduler: SchedulerCtx<'a>,
    prepared_channel: Option<(Value, Value)>,
}

impl<'a> RunContext for ConnectorRunContext<'a>{
    fn did_put(&mut self, port: PortId) -> bool {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.registered_id.is_some();
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        match self.received.get(&port_id) {
            Some(data) => Some(data.clone()),
            None => None,
        }
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.expected_firing.map(|v| Value::Bool(v));
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        return self.prepared_channel.take();
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        self.handle_new_messages(comp_ctx);
        if self.tree.is_in_sync() {
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
                return ConnectorScheduling::Immediate;
            } else {
                return scheduling
            }
        } else {
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
            return scheduling;
        }
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                MessageFancy::Data(message) => self.handle_new_data_message(message, ctx),
                MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx),
                MessageFancy::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(ctx.workspace_branches.is_empty());
        let mut branches = Vec::new(); // TODO: @Remove
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);

        for branch_id in branches.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
        }
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[receiving_branch_id];

                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());

                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, content) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
                    comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{
                        sync_header, data_header,
                        content: DataContent::Message(content),
                    }));

                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
                    return ConnectorScheduling::Immediate;
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }

    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());

        let branch = self.tree.base_branch_mut();
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);

        let mut run_context = ConnectorRunContext{
            branch_id: branch.id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        match run_result {
            RunResult::ComponentTerminated => {
                branch.sync_state = SpeculativeState::Finished;

                return ConnectorScheduling::Exit;
            },
            RunResult::ComponentAtSyncStart => {
                comp_ctx.notify_sync_start();
                let sync_branch_id = self.tree.start_sync();
                self.consensus.start_sync(comp_ctx);
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
                // Note: we're relinquishing ownership of ports. But because
                // we are in non-sync mode the scheduler will handle and check
                // port ownership transfer.
                debug_assert!(comp_ctx.workspace_ports.is_empty());
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);

                let new_state = ComponentState {
                    prompt: Prompt::new(
                        &sched_ctx.runtime.protocol_description.types,
                        &sched_ctx.runtime.protocol_description.heap,
                        definition_id, monomorph_idx, arguments
                    ),
                };
                let new_component = ConnectorPDL::new(new_state);
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
                comp_ctx.workspace_ports.clear();

                return ConnectorScheduling::Later;
            },
            RunResult::NewChannel => {
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
                branch.prepared_channel = Some((
                    Value::Input(PortId::new(putter.self_id.index)),
                    Value::Output(PortId::new(getter.self_id.index)),
                ));

                comp_ctx.push_port(putter);
                comp_ctx.push_port(getter);

                return ConnectorScheduling::Immediate;
            },
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
        }
    }

    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) {
        let mut fake_vec = Vec::new();
        self.tree.end_sync(solution_branch_id);
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);

        for port in fake_vec {
            // TODO: Handle sent/received ports
            debug_assert!(ctx.get_port_by_id(port).is_some());
        }

        ctx.notify_sync_end(&[]);
    }
}