From 2058a2c1bf4c6ac5f44c49d09693bd271386069d 2021-11-23 21:08:36 From: MH Date: 2021-11-23 21:08:36 Subject: [PATCH] One more bug fixed, one very rare one still pending --- diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 2a394a338bcd92260d19d15e4fc365747985ee4a..37401255e873261b6b31e02fe0431fbca980a740 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,12 +1,10 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox::SyncCompContent::Presence; -use crate::runtime2::port::PortState; use super::ConnectorId; use super::branch::BranchId; -use super::port::{ChannelId, PortIdLocal}; +use super::port::{ChannelId, PortIdLocal, PortState}; use super::inbox::{ Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, DataMessage, @@ -61,6 +59,10 @@ struct Peer { // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. Should reduce locking by quite a // bit. +// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID +// but we do want to enumerate all current ports. So put that somewhere in a +// central place. Secondly. Error handling and regular message handling is +// becoming a mess. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state @@ -173,45 +175,9 @@ impl Consensus { return Some(RoundConclusion::Failure); } - // We need to go through the hassle of notifying all participants in the - // sync round that we've encountered an error. - // --- notify leader - let mut channel_presence = Vec::with_capacity(branch.channel_mapping.len()); - for mapping in &branch.channel_mapping { - let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); - channel_presence.push(LocalChannelPresence{ - channel_id: mapping.channel_id, - is_closed: port.state == PortState::Closed, - }); - } - let _never_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ - component_id: ctx.id, - channels: channel_presence, - }), ctx); - debug_assert!(_never_conclusion.is_none()); - let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); - println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); - - // --- initiate discovery wave (to let leader know about all components) - self.handled_wave = true; - for mapping in &self.branch_annotations[0].channel_mapping { - let channel_id = mapping.channel_id; - let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); - let message = SyncPortMessage{ - sync_header: self.create_sync_header(ctx), - source_port: port_info.self_id, - target_port: port_info.peer_id, - content: SyncPortContent::NotificationWave, - }; - - // Note: submitting the message might fail. But we're attempting to - // handle the error anyway. - // TODO: Think about this a second time: how do we make sure the - // entire network will fail if we reach this condition - let _unused = ctx.submit_message(Message::SyncPort(message)); - } - - return maybe_conclusion; + // We're not in the trivial case: since we've communicated we need to + // let everyone know that this round is probably not going to end well. + return self.initiate_sync_failure(ctx); } /// Notifies the consensus algorithm that a branch has reached the end of @@ -378,8 +344,6 @@ impl Consensus { } // Construct data header - // TODO: Handle multiple firings. Right now we just assign the current - // branch to the `None` value because we know we can only send once. let data_header = DataHeader{ expected_mapping: branch.channel_mapping.iter() .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id) @@ -535,9 +499,7 @@ impl Consensus { match message.content { SyncControlContent::ChannelIsClosed(_) => { - // TODO: This is wrong! This might happen in a normal sync. And - // we don't want to fail immediately! - return Some(RoundConclusion::Failure); + return self.initiate_sync_failure(ctx); } } } @@ -798,6 +760,55 @@ impl Consensus { } } + fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.is_in_sync()); + + // Notify leader of our channels and the fact that we just failed + let channel_mapping = &self.branch_annotations[0].channel_mapping; + let mut channel_presence = Vec::with_capacity(channel_mapping.len()); + for mapping in channel_mapping { + let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channel_presence.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port.state == PortState::Closed, + }); + } + let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels: channel_presence, + }), ctx); + + if self.handled_wave { + // Someone (or us) has already initiated a sync failure. + return maybe_already; + } + + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true }); + println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); + + // Initiate a discovery wave so peers can do the same + self.handled_wave = true; + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_info.self_id, + target_port: port_info.peer_id, + content: SyncPortContent::NotificationWave, + }; + + // Note: submitting the message might fail. But we're attempting to + // handle the error anyway. + // TODO: Think about this a second time: how do we make sure the + // entire network will fail if we reach this condition + let _unused = ctx.submit_message(Message::SyncPort(message)); + } + + return maybe_conclusion; + } + #[inline] fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ @@ -1398,9 +1409,11 @@ impl SolutionCombiner { } // Handle channel presence + println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence); if self.presence.is_empty() { // Trivial case - self.presence = combiner.presence + self.presence = combiner.presence; + println!("DEBUGERINO: Trivial merging") } else { for presence in combiner.presence { match self.presence.iter_mut().find(|v| v.id == presence.id) { @@ -1424,6 +1437,7 @@ impl SolutionCombiner { // Both have one presence, combine into both present debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent); entry.owner_b = Some(presence.owner_a); + entry.state = PresenceState::BothPresent; } }, None => { @@ -1431,9 +1445,11 @@ impl SolutionCombiner { } } } + println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence); // After adding everything we might have immediately found a solution if self.check_for_global_failure() { + println!("DEBUG: Returning immediate failure?"); return Some(LeaderConclusion::Failure); } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 8e33bedf26f4d0290cca2be1eeea18447ada1cb5..43574bcd840dadf478a2bd9ecd610dc7987b1709 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -316,14 +316,18 @@ impl Scheduler { let port = scheduled.ctx.ports.remove(port_index); new_connector.ctx.ports.push(port.clone()); - // Notify the peer that the port has changed - let reroute_message = scheduled.router.prepare_reroute( - port.self_id, port.peer_id, scheduled.ctx.id, - port.peer_connector, new_connector.ctx.id - ); - - self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); + // Notify the peer that the port has changed, but only + // if the port wasn't already closed (otherwise the peer + // is gone). + if port.state == PortState::Open { + let reroute_message = scheduled.router.prepare_reroute( + port.self_id, port.peer_id, scheduled.ctx.id, + port.peer_connector, new_connector.ctx.id + ); + + self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); + self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); + } } // Schedule new connector to run diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index ce24e3034c05c44a0dd011e9418f8ea8a3ed529a..2a45f9d8c88f64a672ec64d4dbe31d05f23e9292 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -15,9 +15,9 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed // pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 4; -pub(crate) const NUM_INSTANCES: u32 = 1; -pub(crate) const NUM_LOOPS: u32 = 1; +pub(crate) const NUM_THREADS: u32 = 6; +pub(crate) const NUM_INSTANCES: u32 = 2; +pub(crate) const NUM_LOOPS: u32 = 5; fn create_runtime(pdl: &str) -> Runtime { diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs index f3acdabcf1fbbd962ce73e7fe41de47edde701db..ae6becc1f9226fcd4a2a1b3247d8f3c6027eedf0 100644 --- a/src/runtime2/tests/sync_failure.rs +++ b/src/runtime2/tests/sync_failure.rs @@ -33,41 +33,58 @@ fn test_local_sync_failure() { }) } -#[test] -fn test_shared_sync_failure() { - // Same as above. One of the components should fail, the other should follow - // suit because it cannot complete a sync round. We intentionally have an - // infinite loop in the while condition because we need at least two loops - // for the last error to get picked up. - const CODE: &'static str = " - enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never } - primitive failing_at_location(in input, out output, Location loc) { - u32[] failure_array = {}; - while (true) { - if (loc == Location::BeforeSync) failure_array[0]; - sync { - put(output, true); - if (loc == Location::AfterPut) failure_array[0]; - auto received = get(input); - assert(received); - if (loc == Location::AfterGet) failure_array[0]; - } - if (loc == Location::AfterSync) failure_array[0]; +const SHARED_SYNC_CODE: &'static str = " +enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never } +primitive failing_at_location(in input, out output, Location loc) { + u32[] failure_array = {}; + while (true) { + if (loc == Location::BeforeSync) failure_array[0]; + sync { + put(output, true); + if (loc == Location::AfterPut) failure_array[0]; + auto received = get(input); + assert(received); + if (loc == Location::AfterGet) failure_array[0]; } + if (loc == Location::AfterSync) failure_array[0]; } +} - composite constructor(Location loc) { - channel output_a -> input_a; - channel output_b -> input_b; - new failing_at_location(input_a, output_b, Location::Never); - new failing_at_location(input_b, output_a, loc); - } - "; +composite constructor_a(Location loc) { + channel output_a -> input_a; + channel output_b -> input_b; + new failing_at_location(input_b, output_a, loc); + new failing_at_location(input_a, output_b, Location::Never); +} - run_test_in_runtime(CODE, |api| { +composite constructor_b(Location loc) { + channel output_a -> input_a; + channel output_b -> input_b; + new failing_at_location(input_b, output_a, Location::Never); + new failing_at_location(input_a, output_b, loc); +}"; + +#[test] +fn test_shared_sync_failure_variant_a() { + // One fails, the other one should somehow detect it and fail as well. This + // variant constructs the failing component first. + run_test_in_runtime(SHARED_SYNC_CODE, |api| { for variant in 0..4 { // all `Location` enum variants, except `Never`. // Create the channels - api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + api.create_connector("", "constructor_a", ValueGroup::new_stack(vec![ + Value::Enum(variant) + ])).expect("create connector"); + } + }) +} + +#[test] +fn test_shared_sync_failure_variant_b() { + // One fails, the other one should somehow detect it and fail as well. This + // variant constructs the successful component first. + run_test_in_runtime(SHARED_SYNC_CODE, |api| { + for variant in 0..4 { + api.create_connector("", "constructor_b", ValueGroup::new_stack(vec![ Value::Enum(variant) ])).expect("create connector"); }