diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 2a394a338bcd92260d19d15e4fc365747985ee4a..37401255e873261b6b31e02fe0431fbca980a740 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,12 +1,10 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox::SyncCompContent::Presence; -use crate::runtime2::port::PortState; use super::ConnectorId; use super::branch::BranchId; -use super::port::{ChannelId, PortIdLocal}; +use super::port::{ChannelId, PortIdLocal, PortState}; use super::inbox::{ Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, DataMessage, @@ -61,6 +59,10 @@ struct Peer { // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. Should reduce locking by quite a // bit. +// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID +// but we do want to enumerate all current ports. So put that somewhere in a +// central place. Secondly. Error handling and regular message handling is +// becoming a mess. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state @@ -173,45 +175,9 @@ impl Consensus { return Some(RoundConclusion::Failure); } - // We need to go through the hassle of notifying all participants in the - // sync round that we've encountered an error. - // --- notify leader - let mut channel_presence = Vec::with_capacity(branch.channel_mapping.len()); - for mapping in &branch.channel_mapping { - let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); - channel_presence.push(LocalChannelPresence{ - channel_id: mapping.channel_id, - is_closed: port.state == PortState::Closed, - }); - } - let _never_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ - component_id: ctx.id, - channels: channel_presence, - }), ctx); - debug_assert!(_never_conclusion.is_none()); - let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); - println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); - - // --- initiate discovery wave (to let leader know about all components) - self.handled_wave = true; - for mapping in &self.branch_annotations[0].channel_mapping { - let channel_id = mapping.channel_id; - let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); - let message = SyncPortMessage{ - sync_header: self.create_sync_header(ctx), - source_port: port_info.self_id, - target_port: port_info.peer_id, - content: SyncPortContent::NotificationWave, - }; - - // Note: submitting the message might fail. But we're attempting to - // handle the error anyway. - // TODO: Think about this a second time: how do we make sure the - // entire network will fail if we reach this condition - let _unused = ctx.submit_message(Message::SyncPort(message)); - } - - return maybe_conclusion; + // We're not in the trivial case: since we've communicated we need to + // let everyone know that this round is probably not going to end well. + return self.initiate_sync_failure(ctx); } /// Notifies the consensus algorithm that a branch has reached the end of @@ -378,8 +344,6 @@ impl Consensus { } // Construct data header - // TODO: Handle multiple firings. Right now we just assign the current - // branch to the `None` value because we know we can only send once. let data_header = DataHeader{ expected_mapping: branch.channel_mapping.iter() .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id) @@ -535,9 +499,7 @@ impl Consensus { match message.content { SyncControlContent::ChannelIsClosed(_) => { - // TODO: This is wrong! This might happen in a normal sync. And - // we don't want to fail immediately! - return Some(RoundConclusion::Failure); + return self.initiate_sync_failure(ctx); } } } @@ -798,6 +760,55 @@ impl Consensus { } } + fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.is_in_sync()); + + // Notify leader of our channels and the fact that we just failed + let channel_mapping = &self.branch_annotations[0].channel_mapping; + let mut channel_presence = Vec::with_capacity(channel_mapping.len()); + for mapping in channel_mapping { + let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channel_presence.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port.state == PortState::Closed, + }); + } + let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels: channel_presence, + }), ctx); + + if self.handled_wave { + // Someone (or us) has already initiated a sync failure. + return maybe_already; + } + + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true }); + println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); + + // Initiate a discovery wave so peers can do the same + self.handled_wave = true; + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_info.self_id, + target_port: port_info.peer_id, + content: SyncPortContent::NotificationWave, + }; + + // Note: submitting the message might fail. But we're attempting to + // handle the error anyway. + // TODO: Think about this a second time: how do we make sure the + // entire network will fail if we reach this condition + let _unused = ctx.submit_message(Message::SyncPort(message)); + } + + return maybe_conclusion; + } + #[inline] fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ @@ -1398,9 +1409,11 @@ impl SolutionCombiner { } // Handle channel presence + println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence); if self.presence.is_empty() { // Trivial case - self.presence = combiner.presence + self.presence = combiner.presence; + println!("DEBUGERINO: Trivial merging") } else { for presence in combiner.presence { match self.presence.iter_mut().find(|v| v.id == presence.id) { @@ -1424,6 +1437,7 @@ impl SolutionCombiner { // Both have one presence, combine into both present debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent); entry.owner_b = Some(presence.owner_a); + entry.state = PresenceState::BothPresent; } }, None => { @@ -1431,9 +1445,11 @@ impl SolutionCombiner { } } } + println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence); // After adding everything we might have immediately found a solution if self.check_for_global_failure() { + println!("DEBUG: Returning immediate failure?"); return Some(LeaderConclusion::Failure); } }