Changeset - b4a8c628800d
[Not reviewed]
0 5 0
MH - 4 years ago 2021-11-19 19:59:49
contact@maxhenger.nl
Refactored data messages
4 files changed with 28 insertions and 58 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
// connector.rs
 
//
 
// Represents a component. A component (and the scheduler that is running it)
 
// has many properties that are not easy to subdivide into aspects that are
 
// conceptually handled by particular data structures. That is to say: the code
 
// that we run governs: running PDL code, keeping track of ports, instantiating
 
// new components and transports (i.e. interacting with the runtime), running
 
// a consensus algorithm, etc. But on the other hand, our data is rather
 
// simple: we have a speculative execution tree, a set of ports that we own,
 
// and a bit of code that we should run.
 
//
 
// So currently the code is organized as following:
 
// - The scheduler that is running the component is the authoritative source on
 
//     ports during *non-sync* mode. The consensus algorithm is the
 
//     authoritative source during *sync* mode. They retrieve each other's
 
//     state during the transitions. Hence port data exists duplicated between
 
//     these two datastructures.
 
// - The execution tree is where executed branches reside. But the execution
 
//     tree is only aware of the tree shape itself (and keeps track of some
 
//     queues of branches that are in a particular state), and tends to store
 
//     the PDL program state. The consensus algorithm is also somewhat aware
 
//     of the execution tree, but only in terms of what is needed to complete
 
//     a sync round (for now, that means the port mapping in each branch).
 
//     Hence once more we have properties conceptually associated with branches
 
//     in two places.
 
// - TODO: Write about handling messages, consensus wrapping data
 
// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx
 

	
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 
use crate::runtime2::consensus::RoundConclusion;
 
use crate::runtime2::inbox::SyncPortMessage;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox};
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement};
 
use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 

	
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new(initialize_as_sleeping: bool) -> Self {
 
        ConnectorPublic{
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(initialize_as_sleeping),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
enum Mode {
 
    NonSync,    // running non-sync code
 
    Sync,       // running sync code (in potentially multiple branches)
 
    SyncError,  // encountered an unrecoverable error in sync mode
 
    Error,      // encountered an error in non-sync mode (or finished handling the sync mode error).
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,          // Run again, immediately
 
    Later,              // Schedule for running, at some later point in time
 
    NotNow,             // Do not reschedule for running
 
    Exit,               // Connector has exited
 
    Error(EvalError),   // Connector has experienced a fatal error
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    mode: Mode,
 
    eval_error: Option<EvalError>,
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
}
 

	
 
// TODO: Remove remaining fields once 'fires()' is removed from language.
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    prepared: PreparedStatement,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn performed_put(&mut self, _port: PortId) -> bool {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => false,
 
            PreparedStatement::PerformedPut => true,
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken)
 
        };
 
    }
 

	
 
    fn performed_get(&mut self, _port: PortId) -> Option<ValueGroup> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::PerformedGet(value) => Some(value),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken),
 
        };
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!("Remove fires() now");
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::CreatedChannel(ports) => Some(ports),
 
            taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken),
 
        };
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        return match self.prepared.take() {
 
            PreparedStatement::None => None,
 
            PreparedStatement::ForkedExecution(path) => Some(path),
 
            taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken),
 
        };
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
                // Handle any new finished branches
 
                let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
                while let Some(branch_id) = iter_id {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 
                    if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                        return self.enter_non_sync_mode(round_conclusion, comp_ctx);
 
                    }
 

	
 
                    self.last_finished_handled = Some(branch_id);
 
                }
 

	
 
                return scheduling;
 
            },
 
            Mode::NonSync => {
 
                let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::SyncError => {
 
                todo!("write");
 
                return ConnectorScheduling::Exit;
 
            },
 
            Mode::Error => {
 
                // This shouldn't really be called. Because when we reach exit
 
                // mode the scheduler should not run the component anymore
 
                unreachable!("called component run() during error-mode");
 
            },
 
        }
 
    }
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: Prompt) -> Self {
 
        Self{
 
            mode: Mode::NonSync,
 
            eval_error: None,
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, ctx),
 
                Message::SyncComp(message) => {
 
                    if let Some(result) = self.handle_new_sync_comp_message(message, ctx) {
 
                        return Some(result);
 
                    }
 
                },
 
                Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx),
 
                Message::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        // Retrieve the branch and run it
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 

	
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
        match run_result {
 
            EvalContinuation::BranchInconsistent => {
 
                // Branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            EvalContinuation::BlockFires(port_id) => {
 
                // Branch called `fires()` on a port that has not been used yet.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                // Create two forks, one that assumes the port will fire, and
 
                // one that assumes the port remains silent
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 

	
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::BlockGet(port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 

	
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
                self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                // Note: we only know that a branch is waiting on a message when
 
                // it reaches the `get` call. But we might have already received
 
                // a message that targets this branch, so check now.
 
                let mut any_message_received = false;
 
                for message in comp_ctx.get_read_data_messages(port_id) {
 
                    if self.consensus.branch_can_receive(branch_id, &message) {
 
                        // This branch can receive the message, so we do the
 
                        // fork-and-receive dance
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
                    }
 
                }
 

	
 
                if any_message_received {
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
            EvalContinuation::SyncBlockEnd => {
 
                let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
                if consistency == Consistency::Valid {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            EvalContinuation::NewFork => {
 
                // Like the `NewChannel` result. This means we're setting up
 
                // a branch and putting a marker inside the RunContext for the
 
                // next time we run the PDL code
 
                let left_id = branch_id;
 
                let right_id = self.tree.fork_branch(left_id);
 
                self.consensus.notify_of_new_branch(left_id, right_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, left_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, right_id);
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            EvalContinuation::Put(port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                    content,
 
                })) {
 
                    // We don't own the port
 
                    let pd = &sched_ctx.runtime.protocol_description;
 
                    let eval_error = branch.code_state.new_error_at_expr(
 
                        &pd.modules, &pd.heap,
 
                        String::from("attempted to 'put' on port that is no longer owned")
 
                    );
 
                    self.eval_error = Some(eval_error);
 
                    self.mode = Mode::SyncError;
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
 

	
 
        // If here then the run result did not require a particular action. We
 
        // return whether we have more active branches to run or not.
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::Exit;
 
            },
 
            EvalContinuation::SyncBlockStart => {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                debug_assert!(self.last_finished_handled.is_none());
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 
                self.mode = Mode::Sync;
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Note: we're relinquishing ownership of ports. But because
 
                // we are in non-sync mode the scheduler will handle and check
 
                // port ownership transfer.
 
                debug_assert!(comp_ctx.workspace_ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports);
 

	
 
                let new_prompt = Prompt::new(
 
                    &sched_ctx.runtime.protocol_description.types,
 
                    &sched_ctx.runtime.protocol_description.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                let new_component = ConnectorPDL::new(new_prompt);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            EvalContinuation::NewChannel => {
 
                let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id);
 
                debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter);
 
                branch.prepared = PreparedStatement::CreatedChannel((
 
                    Value::Output(PortId::new(putter.self_id.index)),
 
                    Value::Input(PortId::new(getter.self_id.index)),
 
                ));
 

	
 
                comp_ctx.push_port(putter);
 
                comp_ctx.push_port(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory. If this function returns false, then the component is supposed
 
    /// to exit.
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => if self.mode == Mode::SyncError {
 
                // We experienced an error, so exit now
 
                None
 
            } else {
 
                // We didn't experience an error, so retry
 
                // TODO: Decide what to do with sync errors
 
                Some(BranchId::new_invalid())
 
            }
 
        };
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
            debug_assert!(fake_vec.is_empty());
 

	
 
            ctx.notify_sync_end(&[]);
 
            self.last_finished_handled = None;
 
            self.eval_error = None; // in case we came from the SyncError mode
 
            self.mode = Mode::NonSync;
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            panic!("TEMPTEMP: NOOOOOOOOO 1");
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
 
    /// condition appears.
 
    #[inline]
 
    fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result<EvalContinuation, EvalError> {
 
        loop {
 
            let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx);
 
            if let Ok(EvalContinuation::Stepping) = result {
 
                continue;
 
            }
 

	
 
            return result;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 
use crate::runtime2::scheduler::ComponentPortChange;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal};
 
use super::inbox::{
 
    Message, ChannelAnnotation,
 
    DataMessage, DataContent, DataHeader,
 
    Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader,
 
    SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader,
 
};
 
use super::scheduler::ComponentCtx;
 
use super::scheduler::{ComponentCtx, ComponentPortChange};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    handled_wave: bool, // encountered notification wave in this round
 
    conclusion: Option<RoundConclusion>,
 
    ack_remaining: u32,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            handled_wave: false,
 
            conclusion: None,
 
            ack_remaining: 0,
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    #[deprecated]
 
    pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            channel_mapping: ctx.get_ports().iter()
 
                .map(|v| ChannelAnnotation {
 
                    channel_id: v.channel_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
                .collect(),
 
            cur_marker: BranchMarker::new_invalid(),
 
        });
 
        self.branch_markers.push(BranchId::new_invalid());
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
    /// called for each forked branch in the execution tree.
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            channel_mapping: parent_branch_annotations.channel_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // We need to go through the hassle of notifying all participants in the
 
        // sync round that we've encountered an error.
 
        // --- notify leader
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 

	
 
        // --- initiate discovery wave (to let leader know about all components)
 
        self.handled_wave = true;
 
        for mapping in &self.branch_annotations[0].channel_mapping {
 
            let channel_id = mapping.channel_id;
 
            let port_info = ctx.get_port_by_channel_id(channel_id).unwrap();
 
            let message = SyncPortMessage{
 
                sync_header: self.create_sync_header(ctx),
 
                source_port: port_info.self_id,
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 
            ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.channel_mapping {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
                        // Inconsistent speculative state and actual state
 
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
 
                        return Consistency::Inconsistent;
 
                    }
 
                },
 
                None => {},
 
            }
 
        }
 

	
 
        return Consistency::Valid;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::Data(DataMessage {
 
                ctx.submit_message(Message::SyncPort(SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    data_header: DataHeader{
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: self_port_id,
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                    content: SyncPortContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(self_port_id);
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        let data_header = DataHeader{
 
            expected_mapping: branch.channel_mapping.clone(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool {
 
        let handled = self.handle_received_sync_header(&message.sync_header, ctx);
 
        if handled {
 
            self.encountered_ports.push(message.data_header.target_port);
 
        }
 
        return handled;
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return None;
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 

	
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_, _) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(component_id, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return;
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return;
 
                }
 

	
 
                self.handled_wave = true;
 

	
 
                // Propagate wave to all peers except the one that has sent us
 
                // the wave.
 
                for mapping in &self.branch_annotations[0].channel_mapping {
 
                    let channel_id = mapping.channel_id;
 
                    let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap();
 
                    if port_desc.self_id == message.target_port {
 
                        // Wave came from this port, no need to send one back
 
                        continue;
 
                    }
 

	
 
                    let message = SyncPortMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        source_port: port_desc.self_id,
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    ctx.submit_message(Message::SyncPort(message)).unwrap();
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap();
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == target_port.channel_id {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports);
 
                find_ports_in_value_group(&message.content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) {
 
            if message.sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        if let DataContent::SilentPortNotification = message.content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.channel_mapping {
 
                if expected.channel_id == current.channel_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 
        if !self.handle_peer(sync_header) {
 
            // We can drop this package
 
            return false;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncCompMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::SyncComp(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_data_to_new_leader(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
        } // else: exactly equal, so do nothing
 

	
 
        return true;
 
    }
 

	
 
    /// Handles a (potentially new) peer. Returns `false` if the provided sync
 
    /// number is different then the expected one.
 
    fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool {
 
        let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id);
 
        match position {
 
            Some(index) => {
 
                let entry = &mut self.peers[index];
 
                entry.encountered_this_round = true;
 
                // TODO: Proper handling of potential overflow
 
                if sync_header.sync_round >= entry.expected_sync_round {
 
                    entry.expected_sync_round = sync_header.sync_round;
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return true;
 
            }
 
        }
 
    }
 

	
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
 
                        match conclusion {
 
                            LeaderConclusion::Solution(global_solution) => {
 
                                return self.handle_global_solution_as_leader(global_solution, ctx);
 
                            },
 
                            LeaderConclusion::Failure => {
 
                                return self.handle_global_failure_as_leader(ctx);
 
                            }
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_id, presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::AckFailure => {
 
                    debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion);
 
                    debug_assert!(self.ack_remaining > 0);
 
                    self.ack_remaining -= 1;
 
                    if self.ack_remaining == 0 {
 
                        return Some(RoundConclusion::Failure);
 
                    }
 
                }
 
                SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) |
 
                SyncCompContent::GlobalFailure => {
 
                    unreachable!("unexpected message content for leader");
 
                },
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content,
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.added_by != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.added_by) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.added_by,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
src/runtime2/inbox.rs
Show inline comments
 
use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::SolutionCombiner;
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::consensus::{GlobalSolution, LocalSolution};
 
use super::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct ChannelAnnotation {
 
    pub channel_id: ChannelId,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
 
/// unique identifier for a branch, but differs in that a branch only has one
 
/// branch ID, but might have multiple associated markers (i.e. one branch
 
/// performing a `put` three times will generate three markers.
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchMarker{
 
    marker: u32,
 
}
 

	
 
impl BranchMarker {
 
    #[inline]
 
    pub(crate) fn new(marker: u32) -> Self {
 
        debug_assert!(marker != 0);
 
        return Self{ marker };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ marker: 0 }
 
    }
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
    pub sending_component_id: ConnectorId,
 
    pub highest_component_id: ConnectorId,
 
    pub sync_round: u32,
 
}
 

	
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<ChannelAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchMarker,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
 
//  data message was neat because "silent port notification" should be rerouted
 
//  like any other data message to determine the component ID of the receiver
 
//  and to make it part of the leader election algorithm for the sync leader.
 
//  However: it complicates logic quite a bit. Really it might be easier to
 
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
 
#[derive(Debug, Clone)]
 
pub(crate) enum DataContent {
 
    SilentPortNotification,
 
    Message(ValueGroup),
 
}
 

	
 
impl DataContent {
 
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
 
        match self {
 
            DataContent::SilentPortNotification => None,
 
            DataContent::Message(message) => Some(message),
 
        }
 
    }
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessage {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: DataContent,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncCompContent {
 
    LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever)
 
    LocalSolution(LocalSolution), // sending a local solution to the leader
 
    PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results
 
    GlobalSolution(GlobalSolution), // broadcasting to everyone
 
    GlobalFailure, // broadcasting to everyone
 
    AckFailure, // acknowledgement of failure to leader
 
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
 
    Presence(ConnectorId, Vec<ChannelId>), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm. The message goes directly to a component.
 
#[derive(Debug)]
 
pub(crate) struct SyncCompMessage {
 
    pub sync_header: SyncHeader,
 
    pub target_component_id: ConnectorId,
 
    pub content: SyncCompContent,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncPortContent {
 
    SilentPortNotification,
 
    NotificationWave,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct SyncPortMessage {
 
    pub sync_header: SyncHeader,
 
    pub source_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub content: SyncPortContent,
 
}
 

	
 
/// A control message is a message intended for the scheduler that is executing
 
/// a component.
 
#[derive(Debug)]
 
pub(crate) struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sending_component_id: ConnectorId,
 
    pub content: ControlContent,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ControlContent {
 
    PortPeerChanged(PortIdLocal, ConnectorId),
 
    CloseChannel(PortIdLocal),
 
    Ack,
 
    Ping,
 
}
 

	
 
/// Combination of data message and control messages.
 
#[derive(Debug)]
 
pub(crate) enum Message {
 
    Data(DataMessage),
 
    SyncComp(SyncCompMessage),
 
    SyncPort(SyncPortMessage),
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    /// If the message is sent through a particular channel, then this function
 
    /// returns the port through which the message was sent.
 
    pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
 
        // Currently only data messages have a source port
 
        if let Message::Data(message) = self {
 
            return Some(message.data_header.sending_port);
 
        } else {
 
            return None;
 
        match self {
 
            Message::Data(message) => return Some(message.data_header.sending_port),
 
            Message::SyncPort(message) => return Some(message.source_port),
 
            Message::SyncComp(_) => return None,
 
            Message::Control(_) => return None,
 
        }
 
    }
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub(crate) fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub(crate) fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::Ordering;
 
use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::RoundConclusion;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{
 
    Message, DataContent, DataMessage,
 
    Message, DataMessage,
 
    SyncCompMessage, SyncPortMessage,
 
    ControlContent, ControlMessage
 
};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    success: bool,
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
type SyncDone = Arc<(Mutex<Option<FinishedSync>>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    SyncRound(Vec<ApplicationSyncAction>),
 
    Shutdown,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorApplication
 
// -----------------------------------------------------------------------------
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
// TODO: Strong candidate for logic reduction in handling put/get. A lot of code
 
//  is an approximate copy-pasta from the regular component logic. I'm going to
 
//  wait until I'm implementing more native components to see which logic is
 
//  truly common.
 
pub struct ConnectorApplication {
 
    // Communicating about new jobs and setting up sync rounds
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    is_in_sync: bool,
 
    // Handling current sync round
 
    sync_desc: Vec<ApplicationSyncAction>,
 
    tree: FakeTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
    branch_extra: Vec<usize>, // instruction counter per branch
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if self.is_in_sync {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_conclusion(conclusion, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            return self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(None), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone(),
 
            is_in_sync: false,
 
            sync_desc: Vec::new(),
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 

	
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx),
 
                Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) {
 
        if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            self.collapse_sync_to_conclusion(conclusion, ctx);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.handle_new_messages(comp_ctx);
 

	
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 
        let mut instruction_idx = self.branch_extra[branch_id.index as usize];
 

	
 
        if instruction_idx >= self.sync_desc.len() {
 
            // Performed last instruction, so this branch is officially at the
 
            // end of the synchronous interaction.
 
            let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
            if consistency == Consistency::Valid {
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
            } else {
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            }
 
        } else {
 
            // We still have instructions to perform
 
            let cur_instruction = &self.sync_desc[instruction_idx];
 
            self.branch_extra[branch_id.index as usize] += 1;
 

	
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 

	
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: DataContent::Message(content.clone()),
 
                        content: content.clone(),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 

	
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    branch.awaiting_port = port_id;
 
                    self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                    let mut any_message_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 
                            branch.insert_message(port_id, message.content.clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
                        }
 
                    }
 

	
 
                    if any_message_received {
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) {
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 

	
 
        let (branch_id, success) = match conclusion {
 
            RoundConclusion::Success(branch_id) => {
 
                debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API
 
                (branch_id, true)
 
            },
 
            RoundConclusion::Failure => (BranchId::new_invalid(), false),
 
        };
 

	
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 
        debug_assert!(fake_vec.is_empty());
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
        // Turning hashmapped inbox into vector of values
 
        let mut inbox = Vec::with_capacity(solution_branch.inbox.len());
 
        for action in &self.sync_desc {
 
            match action {
 
                ApplicationSyncAction::Put(_, _) => {},
 
                ApplicationSyncAction::Get(port_id) => {
 
                    debug_assert!(solution_branch.inbox.contains_key(port_id));
 
                    inbox.push(solution_branch.inbox.remove(port_id).unwrap());
 
                },
 
            }
 
        }
 

	
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ success, inbox });
 
        notification.notify_one();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ApplicationInterface
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub enum ChannelCreationError {
 
    InSync,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationStartSyncError {
 
    AlreadyInSync,
 
    NoSyncActions,
 
    IncorrectPortKind,
 
    UnownedPort,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
    Failure,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
    Put(PortIdLocal, ValueGroup),
 
    Get(PortIdLocal),
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    is_in_sync: bool,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<(PortKind, PortIdLocal)>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            is_in_sync: false,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel. Can only fail if the application interface is
 
    /// currently in sync mode.
 
    pub fn create_channel(&mut self) -> Result<Channel, ChannelCreationError> {
 
        if self.is_in_sync {
 
            return Err(ChannelCreationError::InSync);
 
        }
 

	
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push((PortKind::Putter, putter_id));
 
        self.owned_ports.push((PortKind::Getter, getter_id));
 

	
 
        return Ok(Channel{ putter_id, getter_id });
 
    }
 

	
0 comments (0 inline, 0 general)