diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 951045671b8adc3c3eb5901f27538ad6d3c881a7..e0ae0d1004a4669e57bab2fcaff4615debac4a99 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -2,19 +2,20 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::inbox::BranchMarker; +use crate::runtime2::scheduler::ComponentPortChange; use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, PortAnnotation, + Message, ChannelAnnotation, DataMessage, DataContent, DataHeader, - SyncMessage, SyncContent, SyncHeader, + SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, }; use super::scheduler::ComponentCtx; struct BranchAnnotation { - port_mapping: Vec, + channel_mapping: Vec, cur_marker: BranchMarker, } @@ -31,6 +32,12 @@ pub(crate) struct GlobalSolution { channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } +#[derive(Debug, PartialEq, Eq)] +pub enum RoundConclusion { + Failure, + Success(BranchId), +} + // ----------------------------------------------------------------------------- // Consensus // ----------------------------------------------------------------------------- @@ -61,6 +68,9 @@ pub(crate) struct Consensus { // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, + handled_wave: bool, // encountered notification wave in this round + conclusion: Option, + ack_remaining: u32, // --- Persistent state peers: Vec, sync_round: u32, @@ -82,6 +92,9 @@ impl Consensus { branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), + handled_wave: false, + conclusion: None, + ack_remaining: 0, peers: Vec::new(), sync_round: 0, workspace_ports: Vec::new(), @@ -96,9 +109,10 @@ impl Consensus { } /// TODO: Remove this once multi-fire is in place - pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation { + #[deprecated] + pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation { let branch = &self.branch_annotations[branch_id.index as usize]; - let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap(); + let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap(); return port; } @@ -113,9 +127,9 @@ impl Consensus { // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ - port_mapping: ctx.get_ports().iter() - .map(|v| PortAnnotation{ - port_id: v.self_id, + channel_mapping: ctx.get_ports().iter() + .map(|v| ChannelAnnotation { + channel_id: v.channel_id, registered_id: None, expected_firing: None, }) @@ -133,11 +147,12 @@ impl Consensus { pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { // If called correctly. Then each time we are notified the new branch's // index is the length in `branch_annotations`. + println!("DEBUG: Branch {} became forked into {}", parent_branch_id.index, new_branch_id.index); debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ - port_mapping: parent_branch_annotations.port_mapping.clone(), + channel_mapping: parent_branch_annotations.channel_mapping.clone(), cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); @@ -145,22 +160,37 @@ impl Consensus { } /// Notifies the consensus algorithm that a particular branch has - /// encountered an unrecoverable error. If the return value is `false`, then - /// the caller can enter a "normal" exit mode instead of the special "sync" - /// exit mode. - pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId) -> bool { + /// encountered an unrecoverable error. + pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); // Check for trivial case, where branch has not yet communicated within // the consensus algorithm let branch = &self.branch_annotations[failed_branch_id.index as usize]; - if branch.port_mapping.iter().all(|v| v.registered_id.is_none()) { - return false; + if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) { + return Some(RoundConclusion::Failure); } - // Branch has communicated. Since we need to discover the entire + // We need to go through the hassle of notifying all participants in the + // sync round that we've encountered an error. + // --- notify leader + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + + // --- initiate discovery wave (to let leader know about all components) + self.handled_wave = true; + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_info.self_id, + target_port: port_info.peer_id, + content: SyncPortContent::NotificationWave, + }; + ctx.submit_message(Message::SyncPort(message)); + } - return true; + return maybe_conclusion; } /// Notifies the consensus algorithm that a branch has reached the end of @@ -169,7 +199,7 @@ impl Consensus { pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; - for mapping in &branch.port_mapping { + for mapping in &branch.channel_mapping { match mapping.expected_firing { Some(expected) => { if expected != mapping.registered_id.is_some() { @@ -187,11 +217,14 @@ impl Consensus { /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. - pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency { debug_assert!(self.is_in_sync()); + + let port_desc = ctx.get_port_by_id(port_id).unwrap(); + let channel_id = port_desc.channel_id; let branch = &mut self.branch_annotations[branch_id.index as usize]; - for mapping in &mut branch.port_mapping { - if mapping.port_id == port_id { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == channel_id { match mapping.expected_firing { None => { // Not yet mapped, perform speculative mapping @@ -218,20 +251,21 @@ impl Consensus { /// appropriate component. If it is the leader then there is a chance that /// this solution completes a global solution. In that case the solution /// branch ID will be returned. - pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { // Turn the port mapping into a local solution - let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping; + let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping; let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { // Note: if the port is silent, and we've never communicated // over the port, then we need to do so now, to let the peer // component know about our sync leader state. - let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap(); + let self_port_id = port_desc.self_id; let peer_port_id = port_desc.peer_id; let channel_id = port_desc.channel_id; - if !self.encountered_ports.contains(&port.port_id) { + if !self.encountered_ports.contains(&self_port_id) { ctx.submit_message(Message::Data(DataMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, @@ -240,13 +274,13 @@ impl Consensus { }, data_header: DataHeader{ expected_mapping: source_mapping.clone(), - sending_port: port.port_id, + sending_port: self_port_id, target_port: peer_port_id, new_mapping: BranchMarker::new_invalid(), }, content: DataContent::SilentPortNotification, })); - self.encountered_ports.push(port.port_id); + self.encountered_ports.push(self_port_id); } target_mapping.push(( @@ -260,29 +294,30 @@ impl Consensus { final_branch_id: branch_id, port_mapping: target_mapping, }; - let solution_branch = self.send_or_store_local_solution(local_solution, ctx); - return solution_branch; + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx); + return maybe_conclusion; } /// Notifies the consensus algorithm about the chosen branch to commit to - /// memory. - pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { + /// memory (may be the invalid "start" branch) + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); // TODO: Handle sending and receiving ports // Set final ports - final_ports.clear(); let branch = &self.branch_annotations[branch_id.index as usize]; - for port in &branch.port_mapping { - final_ports.push(port.port_id); - } // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); + self.branch_markers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); + self.handled_wave = false; + self.conclusion = None; + self.ack_remaining = 0; + // And modify persistent storage self.sync_round += 1; for peer in self.peers.iter_mut() { @@ -298,11 +333,12 @@ impl Consensus { pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; + let port_info = ctx.get_port_by_id(source_port_id).unwrap(); if cfg!(debug_assertions) { // Check for consistent mapping - let port = branch.port_mapping.iter() - .find(|v| v.port_id == source_port_id) + let port = branch.channel_mapping.iter() + .find(|v| v.channel_id == port_info.channel_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } @@ -318,17 +354,16 @@ impl Consensus { // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. - let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ - expected_mapping: branch.port_mapping.clone(), + expected_mapping: branch.channel_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch.cur_marker, }; // Update port mapping - for mapping in &mut branch.port_mapping { - if mapping.port_id == source_port_id { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == port_info.channel_id { mapping.expected_firing = Some(true); mapping.registered_id = Some(branch.cur_marker); } @@ -348,45 +383,102 @@ impl Consensus { /// responsible for checking for branches that might be able to receive /// the message. pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { - return self.handle_received_sync_header(&message.sync_header, ctx) + let handled = self.handle_received_sync_header(&message.sync_header, ctx); + if handled { + self.encountered_ports.push(message.data_header.target_port); + } + return handled; } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { if !self.handle_received_sync_header(&message.sync_header, ctx) { return None; } // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); - match message.content { - SyncContent::Notification => { - // We were just interested in the header - return None; - }, - SyncContent::LocalSolution(solution) => { - // We might be the leader, or earlier messages caused us to not - // be the leader anymore. - return self.send_or_store_local_solution(solution, ctx); + + match &message.content { + SyncCompContent::LocalFailure | + SyncCompContent::LocalSolution(_) | + SyncCompContent::PartialSolution(_) | + SyncCompContent::AckFailure | + SyncCompContent::Presence(_, _) => { + // Needs to be handled by the leader + return self.send_to_leader_or_handle_as_leader(message.content, ctx); }, - SyncContent::GlobalSolution(solution) => { - // Take branch of interest and return it. + SyncCompContent::GlobalSolution(solution) => { + // Found a global solution + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader let (_, branch_id) = solution.component_branches.iter() - .find(|(connector_id, _)| *connector_id == ctx.id) + .find(|(component_id, _)| *component_id == ctx.id) .unwrap(); - return Some(*branch_id); + return Some(RoundConclusion::Success(*branch_id)); + }, + SyncCompContent::GlobalFailure => { + // Global failure of round, send Ack to leader + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader + let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx); + debug_assert!(_result.is_none()); + return Some(RoundConclusion::Failure); + }, + SyncCompContent::Notification => { + // We were just interested in the sync header we handled above + return None; + } + } + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + if !self.handle_received_sync_header(&message.sync_header, ctx) { + return; + } + + debug_assert!(self.is_in_sync()); + debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); + match message.content { + SyncPortContent::NotificationWave => { + // Wave to discover everyone in the network, handling sync + // header takes care of leader discovery, here we need to make + // sure we propagate the wave + if self.handled_wave { + return; + } + + self.handled_wave = true; + + // Propagate wave to all peers except the one that has sent us + // the wave. + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap(); + if port_desc.self_id == message.target_port { + // Wave came from this port, no need to send one back + continue; + } + + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_desc.self_id, + target_port: port_desc.peer_id, + content: SyncPortContent::NotificationWave, + }; + ctx.submit_message(Message::SyncPort(message)).unwrap(); + } } } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) { + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { debug_assert!(self.branch_can_receive(branch_id, message)); + let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap(); let branch = &mut self.branch_annotations[branch_id.index as usize]; - for mapping in &mut branch.port_mapping { - if mapping.port_id == message.data_header.target_port { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == target_port.channel_id { // Found the port in which the message should be inserted mapping.registered_id = Some(message.data_header.new_mapping); @@ -425,8 +517,8 @@ impl Consensus { for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches - for current in &annotation.port_mapping { - if expected.port_id == current.port_id { + for current in &annotation.channel_mapping { + if expected.channel_id == current.channel_id { if expected.registered_id != current.registered_id { // IDs do not match, we cannot receive the // message in this branch @@ -458,25 +550,25 @@ impl Consensus { continue } - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: peer.id, - content: SyncContent::Notification, + content: SyncCompContent::Notification, }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } // But also send our locally combined solution - self.forward_local_solutions(ctx); + self.forward_local_data_to_new_leader(ctx); } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, - content: SyncContent::Notification + content: SyncCompContent::Notification }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } // else: exactly equal, so do nothing return true; @@ -509,39 +601,120 @@ impl Consensus { } } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { + /// Sends a message towards the leader, if already the leader then the + /// message will be handled immediately. + fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option { if self.highest_connector_id == ctx.id { // We are the leader - if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { - let mut my_final_branch_id = BranchId::new_invalid(); - for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { - if connector_id == ctx.id { - // This is our solution branch - my_final_branch_id = branch_id; - continue; + match content { + SyncCompContent::LocalFailure => { + if self.solution_combiner.mark_failure_and_check_for_global_failure() { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::LocalSolution(local_solution) => { + if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) { + return self.handle_global_solution_as_leader(global_solution, ctx); + } + }, + SyncCompContent::PartialSolution(partial_solution) => { + if let Some(conclusion) = self.solution_combiner.combine(partial_solution) { + match conclusion { + LeaderConclusion::Solution(global_solution) => { + return self.handle_global_solution_as_leader(global_solution, ctx); + }, + LeaderConclusion::Failure => { + return self.handle_global_failure_as_leader(ctx); + } + } + } + }, + SyncCompContent::Presence(component_id, presence) => { + if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::AckFailure => { + debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion); + debug_assert!(self.ack_remaining > 0); + self.ack_remaining -= 1; + if self.ack_remaining == 0 { + return Some(RoundConclusion::Failure); } - - let message = SyncMessage { - sync_header: self.create_sync_header(ctx), - target_component_id: connector_id, - content: SyncContent::GlobalSolution(global_solution.clone()), - }; - ctx.submit_message(Message::Sync(message)); } - - debug_assert!(my_final_branch_id.is_valid()); - return Some(my_final_branch_id); - } else { - return None; + SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) | + SyncCompContent::GlobalFailure => { + unreachable!("unexpected message content for leader"); + }, } } else { // Someone else is the leader - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, - content: SyncContent::LocalSolution(solution), + content, }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); + } + + return None; + } + + fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option { + if self.conclusion.is_some() { + return None; + } + + // Handle the global solution + let mut my_final_branch_id = BranchId::new_invalid(); + for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + if connector_id == ctx.id { + // This is our solution branch + my_final_branch_id = branch_id; + continue; + } + + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: connector_id, + content: SyncCompContent::GlobalSolution(global_solution.clone()), + }; + ctx.submit_message(Message::SyncComp(message)); + } + + debug_assert!(my_final_branch_id.is_valid()); + self.conclusion = Some(RoundConclusion::Success(my_final_branch_id)); + return Some(RoundConclusion::Success(my_final_branch_id)); + } + + fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure()); + if self.conclusion.is_some() { + return None; + } + + // TODO: Performance + let mut encountered = VecSet::new(); + for presence in &self.solution_combiner.presence { + if presence.added_by != ctx.id { + // Did not add it ourselves + if encountered.push(presence.added_by) { + // Not yet sent a message + let message = SyncCompMessage{ + sync_header: self.create_sync_header(ctx), + target_component_id: presence.added_by, + content: SyncCompContent::GlobalFailure, + }; + ctx.submit_message(Message::SyncComp(message)); + } + } + } + + self.conclusion = Some(RoundConclusion::Failure); + if encountered.is_empty() { + // We don't have to wait on Acks + return Some(RoundConclusion::Failure); + } else { return None; } } @@ -555,16 +728,16 @@ impl Consensus { } } - fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) { + fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); - for local_solution in self.solution_combiner.drain() { - let message = SyncMessage { + if let Some(partial_solution) = self.solution_combiner.drain() { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, - content: SyncContent::LocalSolution(local_solution), + content: SyncCompContent::PartialSolution(partial_solution), }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } } } @@ -575,28 +748,28 @@ impl Consensus { // TODO: Remove all debug derives -#[derive(Debug)] +#[derive(Debug, Clone)] struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentLocalSolutions { component: ConnectorId, peers: Vec, @@ -604,9 +777,19 @@ struct ComponentLocalSolutions { all_peers_present: bool, } +#[derive(Debug, Clone)] +struct ChannelPresence { + added_by: ConnectorId, + channel: ChannelId, + both_sides_present: bool, +} + // TODO: Flatten? Flatten. Flatten everything. +#[derive(Debug)] pub(crate) struct SolutionCombiner { - local: Vec + local: Vec, // used for finding solution + presence: Vec, // used to detect all channels present in case of failure + failure_reported: bool, } struct CheckEntry { @@ -617,10 +800,17 @@ struct CheckEntry { solution_index_in_parent: usize,// index in the solution array of the match entry in the parent } +enum LeaderConclusion { + Solution(GlobalSolution), + Failure, +} + impl SolutionCombiner { fn new() -> Self { return Self{ local: Vec::new(), + presence: Vec::new(), + failure_reported: false, }; } @@ -819,15 +1009,47 @@ impl SolutionCombiner { } } - return self.check_new_solution(component_index, solution_index); + return self.check_for_global_solution(component_index, solution_index); + } + + fn add_presence_and_check_for_global_failure(&mut self, present_component: ConnectorId, present: &[ChannelId]) -> bool { + 'new_channel_loop: for new_channel_id in present { + for presence in &mut self.presence { + if presence.channel == *new_channel_id { + if presence.added_by != present_component { + presence.both_sides_present = true; + } + + continue 'new_channel_loop; + } + } + + // Not added yet + self.presence.push(ChannelPresence{ + added_by: present_component, + channel: *new_channel_id, + both_sides_present: false, + }); + } + + return self.check_for_global_failure(); + } + + fn mark_failure_and_check_for_global_failure(&mut self) -> bool { + self.failure_reported = true; + return self.check_for_global_failure(); } /// Checks if, starting at the provided local solution, a global solution /// can be formed. // TODO: At some point, check if divide and conquer is faster? - fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { - if !self.can_have_solution() { - return None; + fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { + // Small trivial test necessary (but not sufficient) for a global + // solution + for component in &self.local { + if !component.all_peers_present { + return None; + } } // Construct initial entry on stack @@ -989,44 +1211,91 @@ impl SolutionCombiner { }); } - /// Simple test if a solution is at all possible. If this returns true it - /// does not mean there actually is a solution. - fn can_have_solution(&self) -> bool { - for component in &self.local { - if !component.all_peers_present { - return false; + /// Checks if all preconditions for global sync failure have been met + fn check_for_global_failure(&self) -> bool { + if !self.failure_reported { + return false; + } + + // Failure is reported, if all components are present then we may emit + // the global failure broadcast + // Check if all are present and we're preparing to fail this round + let mut all_present = true; + for presence in &self.presence { + if !presence.both_sides_present { + all_present = false; + break; } } - return true; + return all_present; // && failure_reported, which is checked above } - /// Turns the entire (partially resolved) global solution back into local - /// solutions to ship to another component. - // TODO: Don't do this, kind of wasteful since a lot of processing has - // already been performed. - fn drain(&mut self) -> Vec { - let mut reserve_len = 0; - for component in &self.local { - reserve_len += component.solutions.len(); + /// Turns the entire (partially resolved) global solution into a structure + /// that can be forwarded to a new parent. The new parent may then merge + /// already obtained information. + fn drain(&mut self) -> Option { + if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported { + return None; } - let mut solutions = Vec::with_capacity(reserve_len); - for component in self.local.drain(..) { - for solution in component.solutions { - solutions.push(LocalSolution{ - component: component.component, - final_branch_id: solution.final_branch_id, - port_mapping: solution.channel_mapping, - }); + let result = SolutionCombiner{ + local: self.local.clone(), + presence: self.presence.clone(), + failure_reported: self.failure_reported, + }; + + self.local.clear(); + self.presence.clear(); + self.failure_reported = false; + return Some(result); + } + + // TODO: Entire routine is quite wasteful. Combine instead of doing all work + // again. + fn combine(&mut self, combiner: SolutionCombiner) -> Option { + self.failure_reported = self.failure_reported || combiner.failure_reported; + + // Handle local solutions + if self.local.is_empty() { + // Trivial case + self.local = combiner.local; + } else { + for local in combiner.local { + for matched in local.solutions { + let local_solution = LocalSolution{ + component: local.component, + final_branch_id: matched.final_branch_id, + port_mapping: matched.channel_mapping, + }; + let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution); + if let Some(global_solution) = maybe_solution { + return Some(LeaderConclusion::Solution(global_solution)); + } + } + } + } + + // Handle channel presence + if self.presence.is_empty() { + // Trivial case + self.presence = combiner.presence + } else { + for presence in combiner.presence { + let global_failure = self.add_presence_and_check_for_global_failure(presence.added_by, &[presence.channel]); + if global_failure { + return Some(LeaderConclusion::Failure); + } } } - return solutions; + return None; } fn clear(&mut self) { self.local.clear(); + self.presence.clear(); + self.failure_reported = false; } }