Changeset - dd4e6a5314f7
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-23 15:16:34
contact@maxhenger.nl
WIP on more failure fixing
6 files changed with 164 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/collections/sets.rs
Show inline comments
 
@@ -106,6 +106,11 @@ impl<T: Eq> VecSet<T> {
 
        self.inner.is_empty()
 
    }
 

	
 
    #[inline]
 
    pub fn len(&self) -> usize {
 
        return self.inner.len();
 
    }
 

	
 
    #[inline]
 
    pub fn into_vec(self) -> Vec<T> {
 
        return self.inner;
src/runtime2/connector.rs
Show inline comments
 
@@ -403,12 +403,14 @@ impl ConnectorPDL {
 
                        );
 
                        self.eval_error = Some(eval_error);
 
                        self.mode = Mode::SyncError;
 

	
 
                        println!("DEBUGERINO: Notify of fatal branch");
 
                        if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) {
 
                            println!("DEBUGERINO: Actually got {:?}", conclusion);
 
                            return self.enter_non_sync_mode(conclusion, comp_ctx);
 
                        }
 
                    }
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result {:?} in sync mode", run_result),
 
        }
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::SyncCompContent::Presence;
 
use crate::runtime2::port::PortState;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
@@ -167,13 +169,28 @@ impl Consensus {
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            println!("DEBUG: Failure everything silent");
 
            return Some(RoundConclusion::Failure);
 
        }
 

	
 
        // We need to go through the hassle of notifying all participants in the
 
        // sync round that we've encountered an error.
 
        // --- notify leader
 
        let mut channel_presence = Vec::with_capacity(branch.channel_mapping.len());
 
        for mapping in &branch.channel_mapping {
 
            let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
            channel_presence.push(LocalChannelPresence{
 
                channel_id: mapping.channel_id,
 
                is_closed: port.state == PortState::Closed,
 
            });
 
        }
 
        let _never_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
            component_id: ctx.id,
 
            channels: channel_presence,
 
        }), ctx);
 
        debug_assert!(_never_conclusion.is_none());
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 
        println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion);
 

	
 
        // --- initiate discovery wave (to let leader know about all components)
 
        self.handled_wave = true;
 
@@ -418,7 +435,7 @@ impl Consensus {
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_, _) => {
 
            SyncCompContent::Presence(_) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
@@ -432,6 +449,7 @@ impl Consensus {
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                println!("DEBUGERINO: Got GlobalFailure, sending Ack in response");
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
@@ -444,9 +462,9 @@ impl Consensus {
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return;
 
            return None;
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
@@ -456,13 +474,14 @@ impl Consensus {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
                return None
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return;
 
                    return None;
 
                }
 

	
 
                self.handled_wave = true;
 
@@ -487,6 +506,23 @@ impl Consensus {
 
                    // result: we're dealing with an error here anyway
 
                    let _unused = ctx.submit_message(Message::SyncPort(message));
 
                }
 

	
 
                // And let the leader know about our port state
 
                let annotations = &self.branch_annotations[0];
 
                let mut channels = Vec::with_capacity(annotations.channel_mapping.len());
 
                for mapping in &annotations.channel_mapping {
 
                    let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
                    channels.push(LocalChannelPresence{
 
                        channel_id: mapping.channel_id,
 
                        is_closed: port_info.state == PortState::Closed,
 
                    });
 
                }
 

	
 
                let maybe_conlusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
                    component_id: ctx.id,
 
                    channels,
 
                }), ctx);
 
                return maybe_conlusion;
 
            }
 
        }
 
    }
 
@@ -499,6 +535,8 @@ impl Consensus {
 

	
 
        match message.content {
 
            SyncControlContent::ChannelIsClosed(_) => {
 
                // TODO: This is wrong! This might happen in a normal sync. And
 
                // we don't want to fail immediately!
 
                return Some(RoundConclusion::Failure);
 
            }
 
        }
 
@@ -656,8 +694,8 @@ impl Consensus {
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_id, presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) {
 
                SyncCompContent::Presence(component_presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
@@ -717,31 +755,45 @@ impl Consensus {
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            // Already sent out a failure
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.added_by != ctx.id {
 
            if presence.owner_a != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.added_by) {
 
                if encountered.push(presence.owner_a) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.added_by,
 
                        target_component_id: presence.owner_a,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            } else if let Some(owner_b) = presence.owner_b {
 
                if owner_b != ctx.id {
 
                    if encountered.push(owner_b) {
 
                        let message = SyncCompMessage{
 
                            sync_header: self.create_sync_header(ctx),
 
                            target_component_id: owner_b,
 
                            content: SyncCompContent::GlobalFailure,
 
                        };
 
                        ctx.submit_message(Message::SyncComp(message)).unwrap();
 
                    }
 
                }
 
            }
 
        }
 

	
 
        println!("DEBUERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.0).collect::<Vec<_>>());
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
 
            return Some(RoundConclusion::Failure);
 
        } else {
 
            self.ack_remaining = encountered.len() as u32;
 
            return None;
 
        }
 
    }
 
@@ -804,11 +856,36 @@ struct ComponentLocalSolutions {
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentPresence {
 
    component_id: ConnectorId,
 
    channels: Vec<LocalChannelPresence>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct LocalChannelPresence {
 
    channel_id: ChannelId,
 
    is_closed: bool,
 
}
 

	
 
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 
enum PresenceState {
 
    OnePresent, // one component reported the channel being open
 
    BothPresent, // two components reported the channel being open
 
    Closed, // one component reported the channel being closed
 
}
 

	
 
/// Record to hold channel state during the error-resolving mode of the leader.
 
/// This is used to determine when the sync region has grown to its largest
 
/// size. The structure is eventually consistent in the sense that a component
 
/// might initially presume a channel is open, only to figure out later it is
 
/// actually closed.
 
#[derive(Debug, Clone)]
 
struct ChannelPresence {
 
    added_by: ConnectorId,
 
    channel: ChannelId,
 
    both_sides_present: bool,
 
    owner_a: ConnectorId,
 
    owner_b: Option<ConnectorId>,
 
    id: ChannelId,
 
    state: PresenceState,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
@@ -1039,25 +1116,42 @@ impl SolutionCombiner {
 
        return self.check_for_global_solution(component_index, solution_index);
 
    }
 

	
 
    fn add_presence_and_check_for_global_failure(&mut self, present_component: ConnectorId, present: &[ChannelId]) -> bool {
 
        'new_channel_loop: for new_channel_id in present {
 
            for presence in &mut self.presence {
 
                if presence.channel == *new_channel_id {
 
                    if presence.added_by != present_component {
 
                        presence.both_sides_present = true;
 
    fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool {
 
        'new_report_loop: for entry in channels {
 
            let mut found = false;
 

	
 
            for existing in &mut self.presence {
 
                if existing.id == entry.channel_id {
 
                    // Same entry. We only update if we have the second
 
                    // component coming in it owns one end of the channel, or if
 
                    // a component is telling us that the channel is (now)
 
                    // closed.
 
                    if entry.is_closed {
 
                        existing.state = PresenceState::Closed;
 
                    } else if component_id != existing.owner_a && existing.state != PresenceState::Closed {
 
                        existing.state = PresenceState::BothPresent;
 
                    }
 

	
 
                    continue 'new_channel_loop;
 
                    if existing.owner_a != component_id {
 
                        existing.owner_b = Some(component_id);
 
                    }
 

	
 
                    found = true;
 
                    break;
 
                }
 
            }
 

	
 
            // Not added yet
 
            if !found {
 
                self.presence.push(ChannelPresence{
 
                added_by: present_component,
 
                channel: *new_channel_id,
 
                both_sides_present: false,
 
                    owner_a: component_id,
 
                    owner_b: None,
 
                    id: entry.channel_id,
 
                    state: if entry.is_closed { PresenceState::Closed } else { PresenceState::OnePresent },
 
                });
 
            }
 
        }
 

	
 
        println!("DEBUGGERINO Presence is now:\n{:#?}", self.presence);
 

	
 
        return self.check_for_global_failure();
 
    }
 
@@ -1249,7 +1343,7 @@ impl SolutionCombiner {
 
        // Check if all are present and we're preparing to fail this round
 
        let mut all_present = true;
 
        for presence in &self.presence {
 
            if !presence.both_sides_present {
 
            if presence.state == PresenceState::OnePresent {
 
                all_present = false;
 
                break;
 
            }
 
@@ -1309,10 +1403,38 @@ impl SolutionCombiner {
 
            self.presence = combiner.presence
 
        } else {
 
            for presence in combiner.presence {
 
                let global_failure = self.add_presence_and_check_for_global_failure(presence.added_by, &[presence.channel]);
 
                if global_failure {
 
                    return Some(LeaderConclusion::Failure);
 
                match self.presence.iter_mut().find(|v| v.id == presence.id) {
 
                    Some(entry) => {
 
                        // Combine entries. Take first that has Closed, then
 
                        // check first that has both, then check if they are
 
                        // combinable
 
                        if entry.state == PresenceState::Closed {
 
                            // Do nothing
 
                        } else if presence.state == PresenceState::Closed {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::Closed;
 
                        } else if entry.state == PresenceState::BothPresent {
 
                            // Again: do nothing
 
                        } else if presence.state == PresenceState::BothPresent {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::BothPresent;
 
                        } else {
 
                            // Both have one presence, combine into both present
 
                            debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent);
 
                            entry.owner_b = Some(presence.owner_a);
 
                        }
 
                    },
 
                    None => {
 
                        self.presence.push(presence);
 
                    }
 
                }
 
            }
 

	
 
            // After adding everything we might have immediately found a solution
 
            if self.check_for_global_failure() {
 
                return Some(LeaderConclusion::Failure);
 
            }
 
        }
 

	
src/runtime2/inbox.rs
Show inline comments
 
@@ -2,7 +2,7 @@ use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::consensus::SolutionCombiner;
 
use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner};
 
use crate::runtime2::port::ChannelId;
 

	
 
use super::ConnectorId;
 
@@ -76,7 +76,7 @@ pub(crate) enum SyncCompContent {
 
    GlobalFailure, // broadcasting to everyone
 
    AckFailure, // acknowledgement of failure to leader
 
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
 
    Presence(ConnectorId, Vec<ChannelId>), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
 
    Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -15,7 +15,7 @@ use crate::runtime2::native::{ApplicationSyncAction};
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 2;
 
pub(crate) const NUM_THREADS: u32 = 4;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
src/runtime2/tests/sync_failure.rs
Show inline comments
 
@@ -65,7 +65,7 @@ fn test_shared_sync_failure() {
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        for variant in 3..4 { // all `Location` enum variants, except `Never`.
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
0 comments (0 inline, 0 general)