Files @ 5d69ddcae67e
Branch filter:

Location: CSY/reowolf/src/runtime2/connector.rs - annotation

5d69ddcae67e 17.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP on fixing bug in test
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
daf15df0f8ca
f4f12a71e2e2
154e5e08b93a
68411f4b8014
8c5d438b0fa3
68411f4b8014
1755ca411ca7
68411f4b8014
68411f4b8014
68411f4b8014
1755ca411ca7
154e5e08b93a
68411f4b8014
f4f12a71e2e2
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
7d01f1245b7c
cf26538b25dc
daf15df0f8ca
7d01f1245b7c
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
58dfabd1be9f
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
ce39b1540ff5
68411f4b8014
68411f4b8014
68411f4b8014
ce39b1540ff5
98aadfccbafd
ce39b1540ff5
ce39b1540ff5
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
ce39b1540ff5
98aadfccbafd
68411f4b8014
ce39b1540ff5
ce39b1540ff5
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
ce39b1540ff5
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
98aadfccbafd
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
b4ac681e0e7f
68411f4b8014
154e5e08b93a
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
b4ac681e0e7f
b4ac681e0e7f
154e5e08b93a
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
58dfabd1be9f
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
b4ac681e0e7f
68411f4b8014
68411f4b8014
154e5e08b93a
68411f4b8014
68411f4b8014
68411f4b8014
154e5e08b93a
154e5e08b93a
44f84629849b
44f84629849b
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
8a530d2dc72f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
8a530d2dc72f
68411f4b8014
68411f4b8014
8a530d2dc72f
68411f4b8014
68411f4b8014
8a530d2dc72f
58dfabd1be9f
58dfabd1be9f
68411f4b8014
68411f4b8014
68411f4b8014
b4ac681e0e7f
b4ac681e0e7f
418aa1170154
68411f4b8014
b4ac681e0e7f
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
98aadfccbafd
98aadfccbafd
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
98aadfccbafd
68411f4b8014
98aadfccbafd
98aadfccbafd
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
7d01f1245b7c
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
58dfabd1be9f
58dfabd1be9f
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
ce39b1540ff5
68411f4b8014
68411f4b8014
68411f4b8014
98aadfccbafd
98aadfccbafd
ce39b1540ff5
98aadfccbafd
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
1755ca411ca7
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
154e5e08b93a
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
68411f4b8014
f450ae18ef58
68411f4b8014
68411f4b8014
68411f4b8014
f450ae18ef58
f450ae18ef58
f450ae18ef58
f450ae18ef58
154e5e08b93a
68411f4b8014
98aadfccbafd
5d69ddcae67e
5d69ddcae67e
98aadfccbafd
f450ae18ef58
154e5e08b93a
154e5e08b93a
98aadfccbafd
98aadfccbafd
f450ae18ef58
f450ae18ef58
f450ae18ef58
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
68411f4b8014
68411f4b8014
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
68411f4b8014
f4f12a71e2e2
f4f12a71e2e2
// connector.rs
//
// Represents a component. A component (and the scheduler that is running it)
// has many properties that are not easy to subdivide into aspects that are
// conceptually handled by particular data structures. That is to say: the code
// that we run governs: running PDL code, keeping track of ports, instantiating
// new components and transports (i.e. interacting with the runtime), running
// a consensus algorithm, etc. But on the other hand, our data is rather
// simple: we have a speculative execution tree, a set of ports that we own,
// and a bit of code that we should run.
//
// So currently the code is organized as following:
// - The scheduler that is running the component is the authoritative source on
//     ports during *non-sync* mode. The consensus algorithm is the
//     authoritative source during *sync* mode. They retrieve each other's
//     state during the transitions. Hence port data exists duplicated between
//     these two datastructures.
// - The execution tree is where executed branches reside. But the execution
//     tree is only aware of the tree shape itself (and keeps track of some
//     queues of branches that are in a particular state), and tends to store
//     the PDL program state. The consensus algorithm is also somewhat aware
//     of the execution tree, but only in terms of what is needed to complete
//     a sync round (for now, that means the port mapping in each branch).
//     Hence once more we have properties conceptually associated with branches
//     in two places.
// - TODO: Write about handling messages, consensus wrapping data
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx

use std::collections::HashMap;
use std::sync::atomic::AtomicBool;

use crate::PortId;
use crate::common::ComponentState;
use crate::protocol::eval::{Prompt, Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};

use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
use super::native::Connector;
use super::port::{PortKind, PortIdLocal};
use super::scheduler::{ComponentCtx, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

#[derive(Eq, PartialEq)]
pub(crate) enum ConnectorScheduling {
    Immediate,      // Run again, immediately
    Later,          // Schedule for running, at some later point in time
    NotNow,         // Do not reschedule for running
    Exit,           // Connector has exited
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
}

struct ConnectorRunContext<'a> {
    branch_id: BranchId,
    consensus: &'a Consensus,
    received: &'a HashMap<PortIdLocal, ValueGroup>,
    scheduler: SchedulerCtx<'a>,
    prepared_channel: Option<(Value, Value)>,
}

impl<'a> RunContext for ConnectorRunContext<'a>{
    fn did_put(&mut self, port: PortId) -> bool {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.registered_id.is_some();
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        match self.received.get(&port_id) {
            Some(data) => Some(data.clone()),
            None => None,
        }
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        let port_id = PortIdLocal::new(port.0.u32_suffix);
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
        return annotation.expected_firing.map(|v| Value::Bool(v));
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        return self.prepared_channel.take();
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        self.handle_new_messages(comp_ctx);
        if self.tree.is_in_sync() {
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
                return ConnectorScheduling::Immediate;
            } else {
                return scheduling
            }
        } else {
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
            return scheduling;
        }
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                Message::Data(message) => self.handle_new_data_message(message, ctx),
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
                Message::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(ctx.workspace_branches.is_empty());
        let mut branches = Vec::new(); // TODO: @Remove
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);

        for branch_id in branches.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
        }
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{
            branch_id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[receiving_branch_id];

                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());

                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, content) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
                    comp_ctx.submit_message(Message::Data(DataMessage {
                        sync_header, data_header,
                        content: DataContent::Message(content),
                    }));

                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
                    return ConnectorScheduling::Immediate;
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }

    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());

        let branch = self.tree.base_branch_mut();
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);

        let mut run_context = ConnectorRunContext{
            branch_id: branch.id,
            consensus: &self.consensus,
            received: &branch.inbox,
            scheduler: sched_ctx,
            prepared_channel: branch.prepared_channel.take(),
        };
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        match run_result {
            RunResult::ComponentTerminated => {
                branch.sync_state = SpeculativeState::Finished;

                return ConnectorScheduling::Exit;
            },
            RunResult::ComponentAtSyncStart => {
                comp_ctx.notify_sync_start();
                let sync_branch_id = self.tree.start_sync();
                self.consensus.start_sync(comp_ctx);
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
                // Note: we're relinquishing ownership of ports. But because
                // we are in non-sync mode the scheduler will handle and check
                // port ownership transfer.
                debug_assert!(comp_ctx.workspace_ports.is_empty());
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);

                let new_state = ComponentState {
                    prompt: Prompt::new(
                        &sched_ctx.runtime.protocol_description.types,
                        &sched_ctx.runtime.protocol_description.heap,
                        definition_id, monomorph_idx, arguments
                    ),
                };
                let new_component = ConnectorPDL::new(new_state);
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
                comp_ctx.workspace_ports.clear();

                return ConnectorScheduling::Later;
            },
            RunResult::NewChannel => {
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
                branch.prepared_channel = Some((
                    Value::Output(PortId::new(putter.self_id.index)),
                    Value::Input(PortId::new(getter.self_id.index)),
                ));

                comp_ctx.push_port(putter);
                comp_ctx.push_port(getter);

                return ConnectorScheduling::Immediate;
            },
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
        }
    }

    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
        let mut fake_vec = Vec::new();
        self.tree.end_sync(solution_branch_id);
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);

        for port in fake_vec {
            // TODO: Handle sent/received ports
            debug_assert!(ctx.get_port_by_id(port).is_some());
        }

        ctx.notify_sync_end(&[]);
    }
}