From dd4e6a5314f7aba99c460d8b208c19a516098294 2021-11-23 15:16:34 From: MH Date: 2021-11-23 15:16:34 Subject: [PATCH] WIP on more failure fixing --- diff --git a/src/collections/sets.rs b/src/collections/sets.rs index a93d6f8df3c732fd08608df15a18535fe90e6ab2..180e111d09091371b9da96f7d13f31de222d2722 100644 --- a/src/collections/sets.rs +++ b/src/collections/sets.rs @@ -106,6 +106,11 @@ impl VecSet { self.inner.is_empty() } + #[inline] + pub fn len(&self) -> usize { + return self.inner.len(); + } + #[inline] pub fn into_vec(self) -> Vec { return self.inner; diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 03499cbc17803f00590555ee5de61b0553487c4d..5495680cd51dc7f94a8de2fa83a3e7a6d0eba35d 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -403,12 +403,14 @@ impl ConnectorPDL { ); self.eval_error = Some(eval_error); self.mode = Mode::SyncError; + + println!("DEBUGERINO: Notify of fatal branch"); + if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { + println!("DEBUGERINO: Actually got {:?}", conclusion); + return self.enter_non_sync_mode(conclusion, comp_ctx); + } } } - - branch.prepared = PreparedStatement::PerformedPut; - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 7da2a179ad5808552f3503de09e651a7d5badf20..2a394a338bcd92260d19d15e4fc365747985ee4a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,6 +1,8 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox::SyncCompContent::Presence; +use crate::runtime2::port::PortState; use super::ConnectorId; use super::branch::BranchId; @@ -167,13 +169,28 @@ impl Consensus { // the consensus algorithm let branch = &self.branch_annotations[failed_branch_id.index as usize]; if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) { + println!("DEBUG: Failure everything silent"); return Some(RoundConclusion::Failure); } // We need to go through the hassle of notifying all participants in the // sync round that we've encountered an error. // --- notify leader + let mut channel_presence = Vec::with_capacity(branch.channel_mapping.len()); + for mapping in &branch.channel_mapping { + let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channel_presence.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port.state == PortState::Closed, + }); + } + let _never_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels: channel_presence, + }), ctx); + debug_assert!(_never_conclusion.is_none()); let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); // --- initiate discovery wave (to let leader know about all components) self.handled_wave = true; @@ -418,7 +435,7 @@ impl Consensus { SyncCompContent::LocalSolution(_) | SyncCompContent::PartialSolution(_) | SyncCompContent::AckFailure | - SyncCompContent::Presence(_, _) => { + SyncCompContent::Presence(_) => { // Needs to be handled by the leader return self.send_to_leader_or_handle_as_leader(message.content, ctx); }, @@ -432,6 +449,7 @@ impl Consensus { }, SyncCompContent::GlobalFailure => { // Global failure of round, send Ack to leader + println!("DEBUGERINO: Got GlobalFailure, sending Ack in response"); debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx); debug_assert!(_result.is_none()); @@ -444,9 +462,9 @@ impl Consensus { } } - pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option { if !self.handle_received_sync_header(&message.sync_header, ctx) { - return; + return None; } debug_assert!(self.is_in_sync()); @@ -456,13 +474,14 @@ impl Consensus { // The point here is to let us become part of the sync round and // take note of the leader in case all of our ports are silent. self.encountered_ports.push(message.target_port); + return None } SyncPortContent::NotificationWave => { // Wave to discover everyone in the network, handling sync // header takes care of leader discovery, here we need to make // sure we propagate the wave if self.handled_wave { - return; + return None; } self.handled_wave = true; @@ -487,6 +506,23 @@ impl Consensus { // result: we're dealing with an error here anyway let _unused = ctx.submit_message(Message::SyncPort(message)); } + + // And let the leader know about our port state + let annotations = &self.branch_annotations[0]; + let mut channels = Vec::with_capacity(annotations.channel_mapping.len()); + for mapping in &annotations.channel_mapping { + let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channels.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port_info.state == PortState::Closed, + }); + } + + let maybe_conlusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels, + }), ctx); + return maybe_conlusion; } } } @@ -499,6 +535,8 @@ impl Consensus { match message.content { SyncControlContent::ChannelIsClosed(_) => { + // TODO: This is wrong! This might happen in a normal sync. And + // we don't want to fail immediately! return Some(RoundConclusion::Failure); } } @@ -656,8 +694,8 @@ impl Consensus { } } }, - SyncCompContent::Presence(component_id, presence) => { - if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) { + SyncCompContent::Presence(component_presence) => { + if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) { return self.handle_global_failure_as_leader(ctx); } }, @@ -717,31 +755,45 @@ impl Consensus { fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure()); if self.conclusion.is_some() { + // Already sent out a failure return None; } // TODO: Performance let mut encountered = VecSet::new(); for presence in &self.solution_combiner.presence { - if presence.added_by != ctx.id { + if presence.owner_a != ctx.id { // Did not add it ourselves - if encountered.push(presence.added_by) { + if encountered.push(presence.owner_a) { // Not yet sent a message let message = SyncCompMessage{ sync_header: self.create_sync_header(ctx), - target_component_id: presence.added_by, + target_component_id: presence.owner_a, content: SyncCompContent::GlobalFailure, }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } + } else if let Some(owner_b) = presence.owner_b { + if owner_b != ctx.id { + if encountered.push(owner_b) { + let message = SyncCompMessage{ + sync_header: self.create_sync_header(ctx), + target_component_id: owner_b, + content: SyncCompContent::GlobalFailure, + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); + } + } } } + println!("DEBUERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.0).collect::>()); self.conclusion = Some(RoundConclusion::Failure); if encountered.is_empty() { // We don't have to wait on Acks return Some(RoundConclusion::Failure); } else { + self.ack_remaining = encountered.len() as u32; return None; } } @@ -804,11 +856,36 @@ struct ComponentLocalSolutions { all_peers_present: bool, } +#[derive(Debug, Clone)] +pub(crate) struct ComponentPresence { + component_id: ConnectorId, + channels: Vec, +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalChannelPresence { + channel_id: ChannelId, + is_closed: bool, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PresenceState { + OnePresent, // one component reported the channel being open + BothPresent, // two components reported the channel being open + Closed, // one component reported the channel being closed +} + +/// Record to hold channel state during the error-resolving mode of the leader. +/// This is used to determine when the sync region has grown to its largest +/// size. The structure is eventually consistent in the sense that a component +/// might initially presume a channel is open, only to figure out later it is +/// actually closed. #[derive(Debug, Clone)] struct ChannelPresence { - added_by: ConnectorId, - channel: ChannelId, - both_sides_present: bool, + owner_a: ConnectorId, + owner_b: Option, + id: ChannelId, + state: PresenceState, } // TODO: Flatten? Flatten. Flatten everything. @@ -1039,26 +1116,43 @@ impl SolutionCombiner { return self.check_for_global_solution(component_index, solution_index); } - fn add_presence_and_check_for_global_failure(&mut self, present_component: ConnectorId, present: &[ChannelId]) -> bool { - 'new_channel_loop: for new_channel_id in present { - for presence in &mut self.presence { - if presence.channel == *new_channel_id { - if presence.added_by != present_component { - presence.both_sides_present = true; + fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool { + 'new_report_loop: for entry in channels { + let mut found = false; + + for existing in &mut self.presence { + if existing.id == entry.channel_id { + // Same entry. We only update if we have the second + // component coming in it owns one end of the channel, or if + // a component is telling us that the channel is (now) + // closed. + if entry.is_closed { + existing.state = PresenceState::Closed; + } else if component_id != existing.owner_a && existing.state != PresenceState::Closed { + existing.state = PresenceState::BothPresent; + } + + if existing.owner_a != component_id { + existing.owner_b = Some(component_id); } - continue 'new_channel_loop; + found = true; + break; } } - // Not added yet - self.presence.push(ChannelPresence{ - added_by: present_component, - channel: *new_channel_id, - both_sides_present: false, - }); + if !found { + self.presence.push(ChannelPresence{ + owner_a: component_id, + owner_b: None, + id: entry.channel_id, + state: if entry.is_closed { PresenceState::Closed } else { PresenceState::OnePresent }, + }); + } } + println!("DEBUGGERINO Presence is now:\n{:#?}", self.presence); + return self.check_for_global_failure(); } @@ -1249,7 +1343,7 @@ impl SolutionCombiner { // Check if all are present and we're preparing to fail this round let mut all_present = true; for presence in &self.presence { - if !presence.both_sides_present { + if presence.state == PresenceState::OnePresent { all_present = false; break; } @@ -1309,11 +1403,39 @@ impl SolutionCombiner { self.presence = combiner.presence } else { for presence in combiner.presence { - let global_failure = self.add_presence_and_check_for_global_failure(presence.added_by, &[presence.channel]); - if global_failure { - return Some(LeaderConclusion::Failure); + match self.presence.iter_mut().find(|v| v.id == presence.id) { + Some(entry) => { + // Combine entries. Take first that has Closed, then + // check first that has both, then check if they are + // combinable + if entry.state == PresenceState::Closed { + // Do nothing + } else if presence.state == PresenceState::Closed { + entry.owner_a = presence.owner_a; + entry.owner_b = presence.owner_b; + entry.state = PresenceState::Closed; + } else if entry.state == PresenceState::BothPresent { + // Again: do nothing + } else if presence.state == PresenceState::BothPresent { + entry.owner_a = presence.owner_a; + entry.owner_b = presence.owner_b; + entry.state = PresenceState::BothPresent; + } else { + // Both have one presence, combine into both present + debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent); + entry.owner_b = Some(presence.owner_a); + } + }, + None => { + self.presence.push(presence); + } } } + + // After adding everything we might have immediately found a solution + if self.check_for_global_failure() { + return Some(LeaderConclusion::Failure); + } } return None; diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 6e5854c85dd7b27d7967b48bc2baa1f472bc0aeb..1260a183ef590ff04c7fd96b068a795795d9565d 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -2,7 +2,7 @@ use std::sync::Mutex; use std::collections::VecDeque; use crate::protocol::eval::ValueGroup; -use crate::runtime2::consensus::SolutionCombiner; +use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner}; use crate::runtime2::port::ChannelId; use super::ConnectorId; @@ -76,7 +76,7 @@ pub(crate) enum SyncCompContent { GlobalFailure, // broadcasting to everyone AckFailure, // acknowledgement of failure to leader Notification, // just a notification (so purpose of message is to send the SyncHeader) - Presence(ConnectorId, Vec), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round) + Presence(ComponentPresence), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round) } /// A sync message is a message that is intended only for the consensus diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 987afe9e1b57e9b32556bf849e308aa34a3b60f3..ce24e3034c05c44a0dd011e9418f8ea8a3ed529a 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -15,7 +15,7 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed // pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 2; +pub(crate) const NUM_THREADS: u32 = 4; pub(crate) const NUM_INSTANCES: u32 = 1; pub(crate) const NUM_LOOPS: u32 = 1; diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs index 04e57cd267bb2ee220ee613790d0223fcb8caabf..f3acdabcf1fbbd962ce73e7fe41de47edde701db 100644 --- a/src/runtime2/tests/sync_failure.rs +++ b/src/runtime2/tests/sync_failure.rs @@ -65,7 +65,7 @@ fn test_shared_sync_failure() { "; run_test_in_runtime(CODE, |api| { - for variant in 3..4 { // all `Location` enum variants, except `Never`. + for variant in 0..4 { // all `Location` enum variants, except `Never`. // Create the channels api.create_connector("", "constructor", ValueGroup::new_stack(vec![ Value::Enum(variant)