Changeset - 1677e0c9568d
[Not reviewed]
0 7 0
MH - 4 years ago 2021-11-19 18:27:39
contact@maxhenger.nl
Halfway implementing failure, fixing bug involving wrong mapping
7 files changed with 574 insertions and 210 deletions:
0 comments (0 inline, 0 general)
src/collections/sets.rs
Show inline comments
 
@@ -72,15 +72,18 @@ impl<T: Eq> VecSet<T> {
 
        self.inner.pop()
 
    }
 

	
 
    /// Pushes a new element into the set. Returns `false` if it was already
 
    /// present and `true` if it is newly added.
 
    #[inline]
 
    pub fn push(&mut self, to_push: T) {
 
    pub fn push(&mut self, to_push: T) -> bool {
 
        for element in self.inner.iter() {
 
            if *element == to_push {
 
                return;
 
                return false;
 
            }
 
        }
 

	
 
        self.inner.push(to_push);
 
        return true
 
    }
 

	
 
    #[inline]
src/runtime2/connector.rs
Show inline comments
 
@@ -34,10 +34,12 @@ use crate::common::ComponentState;
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 
use crate::runtime2::consensus::RoundConclusion;
 
use crate::runtime2::inbox::SyncPortMessage;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox};
 
use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 
@@ -56,7 +58,7 @@ impl ConnectorPublic {
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Copy)]
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
enum Mode {
 
    NonSync,    // running non-sync code
 
    Sync,       // running sync code (in potentially multiple branches)
 
@@ -106,6 +108,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!("Remove fires() now");
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
@@ -130,7 +133,10 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if let Some(scheduling) = self.handle_new_messages(comp_ctx) {
 
            return scheduling;
 
        }
 

	
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
@@ -142,10 +148,9 @@ impl Connector for ConnectorPDL {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 
                    if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                        self.enter_non_sync_mode(solution_branch_id, comp_ctx);
 
                        return ConnectorScheduling::Immediate;
 
                        return self.enter_non_sync_mode(round_conclusion, comp_ctx);
 
                    }
 

	
 
                    self.last_finished_handled = Some(branch_id);
 
@@ -183,14 +188,19 @@ impl ConnectorPDL {
 

	
 
    // --- Handling messages
 

	
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) {
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, ctx),
 
                Message::SyncComp(message) => {
 
                    return self.handle_new_sync_comp_message(message, ctx)
 
                },
 
                Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx),
 
                Message::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
@@ -211,23 +221,30 @@ impl ConnectorPDL {
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            println!("DEBUG: ### Branching due to new data message");
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.enter_non_sync_mode(solution_branch_id, ctx);
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<ConnectorScheduling> {
 
        if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            return Some(self.enter_non_sync_mode(round_conclusion, ctx));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    // --- Running code
 
@@ -275,10 +292,10 @@ impl ConnectorPDL {
 
                let firing_branch_id = self.tree.fork_branch(branch_id);
 
                let silent_branch_id = self.tree.fork_branch(branch_id);
 
                self.consensus.notify_of_new_branch(branch_id, firing_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true);
 
                let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 
                self.consensus.notify_of_new_branch(branch_id, silent_branch_id);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false);
 
                let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx);
 
                debug_assert_eq!(_result, Consistency::Valid);
 

	
 
                // Somewhat important: we push the firing one first, such that
 
@@ -310,8 +327,9 @@ impl ConnectorPDL {
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 

	
 
                        println!("DEBUG: ### Branching due to BlockGet with existing message");
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                        self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                        any_message_received = true;
 
@@ -360,7 +378,8 @@ impl ConnectorPDL {
 
                        &pd.modules, &pd.heap,
 
                        String::from("attempted to 'put' on port that is no longer owned")
 
                    );
 
                    self.mode = Mode::SyncError(eval_error);
 
                    self.eval_error = Some(eval_error);
 
                    self.mode = Mode::SyncError;
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
@@ -399,7 +418,7 @@ impl ConnectorPDL {
 
        match run_result {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 

	
 
                println!("DEBUG: ************ DOING THEM EXITS");
 
                return ConnectorScheduling::Exit;
 
            },
 
            EvalContinuation::SyncBlockStart => {
 
@@ -409,6 +428,7 @@ impl ConnectorPDL {
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 
                self.mode = Mode::Sync;
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
@@ -449,38 +469,43 @@ impl ConnectorPDL {
 

	
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory.
 
    fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
 
    /// memory. If this function returns false, then the component is supposed
 
    /// to exit.
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        let mut fake_vec = Vec::new();
 
        self.tree.end_sync(solution_branch_id);
 
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            // TODO: Handle sent/received ports
 
            debug_assert!(ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        ctx.notify_sync_end(&[]);
 
        self.last_finished_handled = None;
 
        self.eval_error = None; // in case we came from the SyncError mode
 
        self.mode = Mode::NonSync;
 
    }
 

	
 
    /// Helper that moves the component's state into sync-error mode, sending
 
    /// the appropriate errors where needed. The branch that caused the fatal
 
    /// error and the evaluation error should be provided.
 
    fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) {
 
        debug_assert!(self.mode == Mode::Sync);
 
        debug_assert!(self.eval_error.is_none());
 
        self.mode = Mode::SyncError;
 
        self.eval_error = Some(eval_error);
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => if self.mode == Mode::SyncError {
 
                // We experienced an error, so exit now
 
                None
 
            } else {
 
                // We didn't experience an error, so retry
 
                // TODO: Decide what to do with sync errors
 
                Some(BranchId::new_invalid())
 
            }
 
        };
 

	
 
        let failing_branch = &mut self.tree[failing_branch_id];
 
        failing_branch.sync_state = SpeculativeState::Error;
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
            debug_assert!(fake_vec.is_empty());
 

	
 
            ctx.notify_sync_end(&[]);
 
            self.last_finished_handled = None;
 
            self.eval_error = None; // in case we came from the SyncError mode
 
            self.mode = Mode::NonSync;
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            panic!("TEMPTEMP: NOOOOOOOOO 1");
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
src/runtime2/consensus.rs
Show inline comments
 
@@ -2,19 +2,20 @@ use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 
use crate::runtime2::scheduler::ComponentPortChange;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal};
 
use super::inbox::{
 
    Message, PortAnnotation,
 
    Message, ChannelAnnotation,
 
    DataMessage, DataContent, DataHeader,
 
    SyncMessage, SyncContent, SyncHeader,
 
    SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader,
 
};
 
use super::scheduler::ComponentCtx;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
@@ -31,6 +32,12 @@ pub(crate) struct GlobalSolution {
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 
@@ -61,6 +68,9 @@ pub(crate) struct Consensus {
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    handled_wave: bool, // encountered notification wave in this round
 
    conclusion: Option<RoundConclusion>,
 
    ack_remaining: u32,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
@@ -82,6 +92,9 @@ impl Consensus {
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            handled_wave: false,
 
            conclusion: None,
 
            ack_remaining: 0,
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
@@ -96,9 +109,10 @@ impl Consensus {
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation {
 
    #[deprecated]
 
    pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap();
 
        let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap();
 
        return port;
 
    }
 

	
 
@@ -113,9 +127,9 @@ impl Consensus {
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ctx.get_ports().iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: v.self_id,
 
            channel_mapping: ctx.get_ports().iter()
 
                .map(|v| ChannelAnnotation {
 
                    channel_id: v.channel_id,
 
                    registered_id: None,
 
                    expected_firing: None,
 
                })
 
@@ -133,11 +147,12 @@ impl Consensus {
 
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
 
        // If called correctly. Then each time we are notified the new branch's
 
        // index is the length in `branch_annotations`.
 
        println!("DEBUG: Branch {} became forked into {}", parent_branch_id.index, new_branch_id.index);
 
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
 
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        let new_branch_annotations = BranchAnnotation{
 
            port_mapping: parent_branch_annotations.port_mapping.clone(),
 
            channel_mapping: parent_branch_annotations.channel_mapping.clone(),
 
            cur_marker: new_marker,
 
        };
 
        self.branch_annotations.push(new_branch_annotations);
 
@@ -145,22 +160,37 @@ impl Consensus {
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error. If the return value is `false`, then
 
    /// the caller can enter a "normal" exit mode instead of the special "sync"
 
    /// exit mode.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId) -> bool {
 
    /// encountered an unrecoverable error.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.port_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            return false;
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // Branch has communicated. Since we need to discover the entire
 
        // We need to go through the hassle of notifying all participants in the
 
        // sync round that we've encountered an error.
 
        // --- notify leader
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 

	
 
        // --- initiate discovery wave (to let leader know about all components)
 
        self.handled_wave = true;
 
        for mapping in &self.branch_annotations[0].channel_mapping {
 
            let channel_id = mapping.channel_id;
 
            let port_info = ctx.get_port_by_channel_id(channel_id).unwrap();
 
            let message = SyncPortMessage{
 
                sync_header: self.create_sync_header(ctx),
 
                source_port: port_info.self_id,
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 
            ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return true;
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
@@ -169,7 +199,7 @@ impl Consensus {
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.port_mapping {
 
        for mapping in &branch.channel_mapping {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
@@ -187,11 +217,14 @@ impl Consensus {
 

	
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency {
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == port_id {
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
@@ -218,20 +251,21 @@ impl Consensus {
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping;
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&port.port_id) {
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
@@ -240,13 +274,13 @@ impl Consensus {
 
                    },
 
                    data_header: DataHeader{
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: port.port_id,
 
                        sending_port: self_port_id,
 
                        target_port: peer_port_id,
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(port.port_id);
 
                self.encountered_ports.push(self_port_id);
 
            }
 

	
 
            target_mapping.push((
 
@@ -260,29 +294,30 @@ impl Consensus {
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
        return solution_branch;
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory.
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        final_ports.clear();
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for port in &branch.port_mapping {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
@@ -298,11 +333,12 @@ impl Consensus {
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 
@@ -318,17 +354,16 @@ impl Consensus {
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            expected_mapping: branch.channel_mapping.clone(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
@@ -348,45 +383,102 @@ impl Consensus {
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool {
 
        return self.handle_received_sync_header(&message.sync_header, ctx)
 
        let handled = self.handle_received_sync_header(&message.sync_header, ctx);
 
        if handled {
 
            self.encountered_ports.push(message.data_header.target_port);
 
        }
 
        return handled;
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return None;
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
                return None;
 
            },
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                return self.send_or_store_local_solution(solution, ctx);
 

	
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_, _) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(connector_id, _)| *connector_id == ctx.id)
 
                    .find(|(component_id, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return;
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return;
 
                }
 

	
 
                self.handled_wave = true;
 

	
 
                // Propagate wave to all peers except the one that has sent us
 
                // the wave.
 
                for mapping in &self.branch_annotations[0].channel_mapping {
 
                    let channel_id = mapping.channel_id;
 
                    let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap();
 
                    if port_desc.self_id == message.target_port {
 
                        // Wave came from this port, no need to send one back
 
                        continue;
 
                    }
 

	
 
                    let message = SyncPortMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        source_port: port_desc.self_id,
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    ctx.submit_message(Message::SyncPort(message)).unwrap();
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) {
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) {
 
        debug_assert!(self.branch_can_receive(branch_id, message));
 

	
 
        let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap();
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == message.data_header.target_port {
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == target_port.channel_id {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(message.data_header.new_mapping);
 

	
 
@@ -425,8 +517,8 @@ impl Consensus {
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
            for current in &annotation.channel_mapping {
 
                if expected.channel_id == current.channel_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
@@ -458,25 +550,25 @@ impl Consensus {
 
                    continue
 
                }
 

	
 
                let message = SyncMessage {
 
                let message = SyncCompMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: peer.id,
 
                    content: SyncContent::Notification,
 
                    content: SyncCompContent::Notification,
 
                };
 
                ctx.submit_message(Message::Sync(message));
 
                ctx.submit_message(Message::SyncComp(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_solutions(ctx);
 
            self.forward_local_data_to_new_leader(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessage {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
                content: SyncCompContent::Notification
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
            ctx.submit_message(Message::SyncComp(message));
 
        } // else: exactly equal, so do nothing
 

	
 
        return true;
 
@@ -509,39 +601,120 @@ impl Consensus {
 
        }
 
    }
 

	
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 
                let mut my_final_branch_id = BranchId::new_invalid();
 
                for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
                    if connector_id == ctx.id {
 
                        // This is our solution branch
 
                        my_final_branch_id = branch_id;
 
                        continue;
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
 
                        match conclusion {
 
                            LeaderConclusion::Solution(global_solution) => {
 
                                return self.handle_global_solution_as_leader(global_solution, ctx);
 
                            },
 
                            LeaderConclusion::Failure => {
 
                                return self.handle_global_failure_as_leader(ctx);
 
                            }
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_id, presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::AckFailure => {
 
                    debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion);
 
                    debug_assert!(self.ack_remaining > 0);
 
                    self.ack_remaining -= 1;
 
                    if self.ack_remaining == 0 {
 
                        return Some(RoundConclusion::Failure);
 
                    }
 

	
 
                    let message = SyncMessage {
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: connector_id,
 
                        content: SyncContent::GlobalSolution(global_solution.clone()),
 
                    };
 
                    ctx.submit_message(Message::Sync(message));
 
                }
 

	
 
                debug_assert!(my_final_branch_id.is_valid());
 
                return Some(my_final_branch_id);
 
            } else {
 
                return None;
 
                SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) |
 
                SyncCompContent::GlobalFailure => {
 
                    unreachable!("unexpected message content for leader");
 
                },
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessage {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
                content,
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
            ctx.submit_message(Message::SyncComp(message));
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message));
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.added_by != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.added_by) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.added_by,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message));
 
                }
 
            }
 
        }
 

	
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
 
            return Some(RoundConclusion::Failure);
 
        } else {
 
            return None;
 
        }
 
    }
 
@@ -555,16 +728,16 @@ impl Consensus {
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) {
 
    fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        for local_solution in self.solution_combiner.drain() {
 
            let message = SyncMessage {
 
        if let Some(partial_solution) = self.solution_combiner.drain() {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(local_solution),
 
                content: SyncCompContent::PartialSolution(partial_solution),
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
            ctx.submit_message(Message::SyncComp(message));
 
        }
 
    }
 
}
 
@@ -575,28 +748,28 @@ impl Consensus {
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
@@ -604,9 +777,19 @@ struct ComponentLocalSolutions {
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ChannelPresence {
 
    added_by: ConnectorId,
 
    channel: ChannelId,
 
    both_sides_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
#[derive(Debug)]
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>
 
    local: Vec<ComponentLocalSolutions>, // used for finding solution
 
    presence: Vec<ChannelPresence>, // used to detect all channels present in case of failure
 
    failure_reported: bool,
 
}
 

	
 
struct CheckEntry {
 
@@ -617,10 +800,17 @@ struct CheckEntry {
 
    solution_index_in_parent: usize,// index in the solution array of the match entry in the parent
 
}
 

	
 
enum LeaderConclusion {
 
    Solution(GlobalSolution),
 
    Failure,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
            presence: Vec::new(),
 
            failure_reported: false,
 
        };
 
    }
 

	
 
@@ -819,15 +1009,47 @@ impl SolutionCombiner {
 
            }
 
        }
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
        return self.check_for_global_solution(component_index, solution_index);
 
    }
 

	
 
    fn add_presence_and_check_for_global_failure(&mut self, present_component: ConnectorId, present: &[ChannelId]) -> bool {
 
        'new_channel_loop: for new_channel_id in present {
 
            for presence in &mut self.presence {
 
                if presence.channel == *new_channel_id {
 
                    if presence.added_by != present_component {
 
                        presence.both_sides_present = true;
 
                    }
 

	
 
                    continue 'new_channel_loop;
 
                }
 
            }
 

	
 
            // Not added yet
 
            self.presence.push(ChannelPresence{
 
                added_by: present_component,
 
                channel: *new_channel_id,
 
                both_sides_present: false,
 
            });
 
        }
 

	
 
        return self.check_for_global_failure();
 
    }
 

	
 
    fn mark_failure_and_check_for_global_failure(&mut self) -> bool {
 
        self.failure_reported = true;
 
        return self.check_for_global_failure();
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    // TODO: At some point, check if divide and conquer is faster?
 
    fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option<GlobalSolution> {
 
        if !self.can_have_solution() {
 
            return None;
 
    fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option<GlobalSolution> {
 
        // Small trivial test necessary (but not sufficient) for a global
 
        // solution
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return None;
 
            }
 
        }
 

	
 
        // Construct initial entry on stack
 
@@ -989,44 +1211,91 @@ impl SolutionCombiner {
 
        });
 
    }
 

	
 
    /// Simple test if a solution is at all possible. If this returns true it
 
    /// does not mean there actually is a solution.
 
    fn can_have_solution(&self) -> bool {
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return false;
 
    /// Checks if all preconditions for global sync failure have been met
 
    fn check_for_global_failure(&self) -> bool {
 
        if !self.failure_reported {
 
            return false;
 
        }
 

	
 
        // Failure is reported, if all components are present then we may emit
 
        // the global failure broadcast
 
        // Check if all are present and we're preparing to fail this round
 
        let mut all_present = true;
 
        for presence in &self.presence {
 
            if !presence.both_sides_present {
 
                all_present = false;
 
                break;
 
            }
 
        }
 

	
 
        return true;
 
        return all_present; // && failure_reported, which is checked above
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution back into local
 
    /// solutions to ship to another component.
 
    // TODO: Don't do this, kind of wasteful since a lot of processing has
 
    //  already been performed.
 
    fn drain(&mut self) -> Vec<LocalSolution> {
 
        let mut reserve_len = 0;
 
        for component in &self.local {
 
            reserve_len += component.solutions.len();
 
    /// Turns the entire (partially resolved) global solution into a structure
 
    /// that can be forwarded to a new parent. The new parent may then merge
 
    /// already obtained information.
 
    fn drain(&mut self) -> Option<SolutionCombiner> {
 
        if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported {
 
            return None;
 
        }
 

	
 
        let mut solutions = Vec::with_capacity(reserve_len);
 
        for component in self.local.drain(..) {
 
            for solution in component.solutions {
 
                solutions.push(LocalSolution{
 
                    component: component.component,
 
                    final_branch_id: solution.final_branch_id,
 
                    port_mapping: solution.channel_mapping,
 
                });
 
        let result = SolutionCombiner{
 
            local: self.local.clone(),
 
            presence: self.presence.clone(),
 
            failure_reported: self.failure_reported,
 
        };
 

	
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
        return Some(result);
 
    }
 

	
 
    // TODO: Entire routine is quite wasteful. Combine instead of doing all work
 
    //  again.
 
    fn combine(&mut self, combiner: SolutionCombiner) -> Option<LeaderConclusion> {
 
        self.failure_reported = self.failure_reported || combiner.failure_reported;
 

	
 
        // Handle local solutions
 
        if self.local.is_empty() {
 
            // Trivial case
 
            self.local = combiner.local;
 
        } else {
 
            for local in combiner.local {
 
                for matched in local.solutions {
 
                    let local_solution = LocalSolution{
 
                        component: local.component,
 
                        final_branch_id: matched.final_branch_id,
 
                        port_mapping: matched.channel_mapping,
 
                    };
 
                    let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution);
 
                    if let Some(global_solution) = maybe_solution {
 
                        return Some(LeaderConclusion::Solution(global_solution));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Handle channel presence
 
        if self.presence.is_empty() {
 
            // Trivial case
 
            self.presence = combiner.presence
 
        } else {
 
            for presence in combiner.presence {
 
                let global_failure = self.add_presence_and_check_for_global_failure(presence.added_by, &[presence.channel]);
 
                if global_failure {
 
                    return Some(LeaderConclusion::Failure);
 
                }
 
            }
 
        }
 

	
 
        return solutions;
 
        return None;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
    }
 
}
 

	
src/runtime2/inbox.rs
Show inline comments
 
@@ -2,6 +2,8 @@ use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::SolutionCombiner;
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
@@ -11,8 +13,8 @@ use super::port::PortIdLocal;
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
pub(crate) struct ChannelAnnotation {
 
    pub channel_id: ChannelId,
 
    pub registered_id: Option<BranchMarker>,
 
    pub expected_firing: Option<bool>,
 
}
 
@@ -50,7 +52,7 @@ pub(crate) struct SyncHeader {
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub expected_mapping: Vec<ChannelAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchMarker,
 
@@ -87,19 +89,37 @@ pub(crate) struct DataMessage {
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 
pub(crate) enum SyncCompContent {
 
    LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever)
 
    LocalSolution(LocalSolution), // sending a local solution to the leader
 
    PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results
 
    GlobalSolution(GlobalSolution), // broadcasting to everyone
 
    GlobalFailure, // broadcasting to everyone
 
    AckFailure, // acknowledgement of failure to leader
 
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
 
    Presence(ConnectorId, Vec<ChannelId>), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm.
 
/// algorithm. The message goes directly to a component.
 
#[derive(Debug)]
 
pub(crate) struct SyncMessage {
 
pub(crate) struct SyncCompMessage {
 
    pub sync_header: SyncHeader,
 
    pub target_component_id: ConnectorId,
 
    pub content: SyncContent,
 
    pub content: SyncCompContent,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncPortContent {
 
    NotificationWave,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct SyncPortMessage {
 
    pub sync_header: SyncHeader,
 
    pub source_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub content: SyncPortContent,
 
}
 

	
 
/// A control message is a message intended for the scheduler that is executing
 
@@ -123,7 +143,8 @@ pub(crate) enum ControlContent {
 
#[derive(Debug)]
 
pub(crate) enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    SyncComp(SyncCompMessage),
 
    SyncPort(SyncPortMessage),
 
    Control(ControlMessage),
 
}
 

	
src/runtime2/native.rs
Show inline comments
 
@@ -5,6 +5,7 @@ use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::RoundConclusion;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
@@ -12,7 +13,11 @@ use super::scheduler::{SchedulerCtx, ComponentCtx};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage};
 
use super::inbox::{
 
    Message, DataContent, DataMessage,
 
    SyncCompMessage, SyncPortMessage,
 
    ControlContent, ControlMessage
 
};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
@@ -25,6 +30,7 @@ pub(crate) trait Connector {
 

	
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    success: bool,
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
@@ -70,9 +76,9 @@ impl Connector for ConnectorApplication {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_solution_branch(solution_branch, comp_ctx);
 
                    self.collapse_sync_to_conclusion(conclusion, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 
@@ -108,7 +114,8 @@ impl ConnectorApplication {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx),
 
                Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx),
 
                Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
@@ -138,19 +145,23 @@ impl ConnectorApplication {
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
    pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) {
 
        if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) {
 
            self.collapse_sync_to_conclusion(conclusion, ctx);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        self.consensus.handle_new_sync_port_message(message, ctx);
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
@@ -214,7 +225,7 @@ impl ConnectorApplication {
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_message_received = true;
 
@@ -287,16 +298,21 @@ impl ConnectorApplication {
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) {
 
        debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program
 
    fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) {
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 

	
 
        let (branch_id, success) = match conclusion {
 
            RoundConclusion::Success(branch_id) => {
 
                debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API
 
                (branch_id, true)
 
            },
 
            RoundConclusion::Failure => (BranchId::new_invalid(), false),
 
        };
 

	
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            debug_assert!(comp_ctx.get_port_by_id(port).is_some());
 
        }
 
        debug_assert!(fake_vec.is_empty());
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
@@ -320,7 +336,7 @@ impl ConnectorApplication {
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ inbox });
 
        *results = Some(FinishedSync{ success, inbox });
 
        notification.notify_one();
 
    }
 
}
 
@@ -345,6 +361,7 @@ pub enum ApplicationStartSyncError {
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
    Failure,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
@@ -496,7 +513,12 @@ impl ApplicationInterface {
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        return Ok(lock.take().unwrap().inbox);
 
        let result = lock.take().unwrap();
 
        if result.success {
 
            return Ok(result.inbox);
 
        } else {
 
            return Err(ApplicationEndSyncError::Failure);
 
        }
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
src/runtime2/scheduler.rs
Show inline comments
 
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::eval::EvalError;
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
@@ -134,7 +135,7 @@ impl Scheduler {
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
@@ -206,7 +207,7 @@ impl Scheduler {
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
@@ -221,13 +222,22 @@ impl Scheduler {
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::Sync(content) => {
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                }
 
                Message::Control(_) => {
 
                    unreachable!("component sending control messages directly");
 
                }
 
@@ -339,7 +349,8 @@ impl Scheduler {
 
    fn get_message_target_port(message: &Message) -> Option<PortIdLocal> {
 
        match message {
 
            Message::Data(data) => return Some(data.data_header.target_port),
 
            Message::Sync(_) => {},
 
            Message::SyncComp(_) => {},
 
            Message::SyncPort(content) => return Some(content.target_port),
 
            Message::Control(control) => {
 
                match &control.content {
 
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
 
@@ -354,11 +365,11 @@ impl Scheduler {
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
@@ -450,6 +461,10 @@ impl ComponentCtx {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.channel_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 
@@ -528,10 +543,14 @@ impl ComponentCtx {
 
                self.inbox_len_read += 1;
 
                return Some(Message::Data(content.clone()));
 
            },
 
            Message::Sync(_) => {
 
            Message::SyncComp(_) => {
 
                let message = self.inbox_messages.remove(self.inbox_len_read);
 
                return Some(message);
 
            },
 
            Message::SyncPort(_) => {
 
                let message = self.inbox_messages.remove(self.inbox_len_read);
 
                return Some(message);
 
            }
 
            Message::Control(_) => unreachable!("control message ended up in component inbox"),
 
        }
 
    }
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -10,9 +10,14 @@ use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
0 comments (0 inline, 0 general)