Files @ ce98be9707a6
Branch filter:

Location: CSY/reowolf/src/runtime2/connector2.rs

ce98be9707a6 9.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
wip on refactoring component
use std::sync::atomic::AtomicBool;
use crate::common::ComponentState;
use crate::PortId;
use crate::protocol::eval::{Value, ValueGroup};
use crate::protocol::{RunContext, RunResult};
use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
use crate::runtime2::connector::ConnectorScheduling;
use crate::runtime2::consensus::{Consensus, Consistency};
use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
use crate::runtime2::inbox::PublicInbox;
use crate::runtime2::native::Connector;
use crate::runtime2::port::PortIdLocal;
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};

pub(crate) struct ConnectorPublic {
    pub inbox: PublicInbox,
    pub sleeping: AtomicBool,
}

impl ConnectorPublic {
    pub fn new(initialize_as_sleeping: bool) -> Self {
        ConnectorPublic{
            inbox: PublicInbox::new(),
            sleeping: AtomicBool::new(initialize_as_sleeping),
        }
    }
}

pub(crate) struct ConnectorPDL {
    tree: ExecTree,
    consensus: Consensus,
    branch_workspace: Vec<BranchId>,
}

struct ConnectorRunContext {};
impl RunContext for ConnectorRunContext{
    fn did_put(&mut self, port: PortId) -> bool {
        todo!()
    }

    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
        todo!()
    }

    fn fires(&mut self, port: PortId) -> Option<Value> {
        todo!()
    }

    fn get_channel(&mut self) -> Option<(Value, Value)> {
        todo!()
    }
}

impl Connector for ConnectorPDL {
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        todo!()
    }
}

impl ConnectorPDL {
    pub fn new(initial: ComponentState, owned_ports: Vec<PortIdLocal>) -> Self {
        Self{
            tree: ExecTree::new(initial),
            consensus: Consensus::new(),
        }
    }

    // --- Handling messages

    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) {
        while let Some(message) = ctx.read_next_message() {
            match message {
                MessageFancy::Data(message) => handle_new_data_message(message, ctx),
                MessageFancy::Sync(message) => handle_new_sync_message(message, ctx),
                MessageFancy::Control(_) => unreachable!("control message in component"),
            }
        }
    }

    pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) {
        // Go through all branches that are awaiting new messages and see if
        // there is one that can receive this message.
        debug_assert!(self.branch_workspace.is_empty());
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace);

        for branch_id in self.branch_workspace.drain(..) {
            // This branch can receive, so fork and given it the message
            let receiving_branch_id = self.tree.fork_branch(branch_id);
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
            let receiving_branch = &mut self.tree[receiving_branch_id];

            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
            self.consensus.notify_of_received_message(branch_id, &message.data_header);

            // And prepare the branch for running
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
        }
    }

    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
        todo!("handle content of message?");
    }

    // --- Running code

    pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
        // Check if we have any branch that needs running
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
        if branch_id.is_none() {
            return ConnectorScheduling::NotNow;
        }

        // Retrieve the branch and run it
        let branch_id = branch_id.unwrap();
        let branch = &mut self.tree[branch_id];

        let mut run_context = ConnectorRunContext{};
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);

        // Handle the returned result. Note that this match statement contains
        // explicit returns in case the run result requires that the component's
        // code is ran again immediately
        match run_result {
            RunResult::BranchInconsistent => {
                // Branch became inconsistent
                branch.sync_state = SpeculativeState::Inconsistent;
            },
            RunResult::BranchMissingPortState(port_id) => {
                // Branch called `fires()` on a port that has not been used yet.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);

                // Create two forks, one that assumes the port will fire, and
                // one that assumes the port remains silent
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;

                let firing_branch_id = self.tree.fork_branch(branch_id);
                let silent_branch_id = self.tree.fork_branch(branch_id);
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
                debug_assert_eq!(_result, Consistency::Valid);
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
                debug_assert_eq!(_result, Consistency::Valid);

                // Somewhat important: we push the firing one first, such that
                // that branch is ran again immediately.
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);

                return ConnectorScheduling::Immediate;
            },
            RunResult::BranchMissingPortValue(port_id) => {
                // Branch performed a `get()` on a port that does not have a
                // received message on that port.
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `get()` is valid, so mark the branch as awaiting a message
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
                    branch.awaiting_port = port_id;
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);

                    // Note: we only know that a branch is waiting on a message when
                    // it reaches the `get` call. But we might have already received
                    // a message that targets this branch, so check now.
                    let mut any_branch_received = false;
                    for message in comp_ctx.get_read_data_messages(port_id) {
                        if self.consensus.branch_can_receive(branch_id, &message.data_header) {
                            // This branch can receive the message, so we do the
                            // fork-and-receive dance
                            let recv_branch_id = self.tree.fork_branch(branch_id);
                            let branch = &mut self.tree[recv_branch_id];
                            branch.insert_message(port_id, message.content.clone());

                            self.consensus.notify_of_new_branch(branch_id, recv_branch_id);
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header);
                            self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id);

                            any_branch_received = true;
                        }
                    }

                    if any_branch_received {
                        return ConnectorScheduling::Immediate;
                    }
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            }
            RunResult::BranchAtSyncEnd => {
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
                if consistency == Consistency::Valid {
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
                } else if consistency == Consistency::Inconsistent {
                    branch.sync_state == SpeculativeState::Inconsistent;
                }
            },
            RunResult::BranchPut(port_id, contents) => {
                // Branch is attempting to send data
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
                let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
                if consistency == Consistency::Valid {
                    // `put()` is valid.
                    self.consensus.
                } else {
                    branch.sync_state = SpeculativeState::Inconsistent;
                }
            },
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
        }

        // If here then the run result did not require a particular action. We
        // return whether we have more active branches to run or not.
        if self.tree.queue_is_empty(QueueKind::Runnable) {
            return ConnectorScheduling::NotNow;
        } else {
            return ConnectorScheduling::Later;
        }
    }
}