diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 234cf968bd19d1c801185f1b1303a503a2af521d..b65ea87882def537dd7a4ad334e7c65a9a87f79c 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,14 +1,14 @@ -use std::path::Component; -use std::str::pattern::Pattern; use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; -use crate::runtime2::ConnectorId; -use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; -use crate::runtime2::inbox::SyncMessage; -use crate::runtime2::port::{ChannelId, Port, PortIdLocal}; -use crate::runtime2::scheduler::ComponentCtxFancy; -use super::inbox2::PortAnnotation; + +use super::branch::{BranchId, ExecTree, QueueKind}; +use super::ConnectorId; +use super::port::{ChannelId, Port, PortIdLocal}; +use super::inbox2::{ + DataHeader, DataMessageFancy, MessageFancy, + SyncContent, SyncHeader, SyncMessageFancy, PortAnnotation +}; +use super::scheduler::ComponentCtxFancy; struct BranchAnnotation { port_mapping: Vec, @@ -22,8 +22,8 @@ pub(crate) struct LocalSolution { #[derive(Clone)] pub(crate) struct GlobalSolution { - branches: Vec<(ConnectorId, BranchId)>, - port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info + component_branches: Vec<(ConnectorId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -88,10 +88,12 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[Port]) { + pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.encountered_peers.is_empty()); + debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. @@ -104,6 +106,9 @@ impl Consensus { }) .collect(), }); + + self.highest_connector_id = ctx.id; + } /// Notifies the consensus algorithm that a new branch has appeared. Must be @@ -172,7 +177,7 @@ impl Consensus { /// Generates sync messages for any branches that are at the end of the /// sync block. To find these branches, they should've been put in the /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) { + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) -> Option { debug_assert!(self.is_in_sync()); let mut last_branch_id = self.last_finished_handled; @@ -194,12 +199,17 @@ impl Consensus { final_branch_id: branch.id, port_mapping: target_mapping, }; - self.send_or_store_local_solution(local_solution, ctx); + let solution_branch = self.send_or_store_local_solution(local_solution, ctx); + if solution_branch.is_some() { + // No need to continue iterating, we've found the solution + return solution_branch; + } last_branch_id = Some(branch.id); } self.last_finished_handled = last_branch_id; + return None; } pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { @@ -213,7 +223,12 @@ impl Consensus { final_ports.push(port.port_id); } - // Clear out internal storage + // Clear out internal storage to defaults + self.highest_connector_id = ConnectorId::new_invalid(); + self.branch_annotations.clear(); + self.last_finished_handled = None; + self.encountered_peers.clear(); + self.solution_combiner.clear(); } // --- Handling messages @@ -289,12 +304,11 @@ impl Consensus { SyncContent::LocalSolution(solution) => { // We might be the leader, or earlier messages caused us to not // be the leader anymore. - self.send_or_store_local_solution(solution, ctx); - return None; + return self.send_or_store_local_solution(solution, ctx); }, SyncContent::GlobalSolution(solution) => { // Take branch of interest and return it. - let (_, branch_id) = solution.branches.iter() + let (_, branch_id) = solution.component_branches.iter() .find(|(connector_id, _)| connector_id == ctx.id) .unwrap(); return Some(*branch_id); @@ -402,11 +416,18 @@ impl Consensus { } // else: exactly equal, so do nothing } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { - for (connector_id, _) in global_solution.branches.iter().copied() { + let mut my_final_branch_id = BranchId::new_invalid(); + for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + if connector_id == ctx.id { + // This is our solution branch + my_final_branch_id = branch_id; + continue; + } + let message = SyncMessageFancy{ sync_header: self.create_sync_header(ctx), target_component_id: connector_id, @@ -414,6 +435,11 @@ impl Consensus { }; ctx.submit_message(MessageFancy::Sync(message)); } + + debug_assert!(my_final_branch_id.is_valid()); + return Some(my_final_branch_id); + } else { + return None; } } else { // Someone else is the leader @@ -423,6 +449,7 @@ impl Consensus { content: SyncContent::LocalSolution(solution), }; ctx.submit_message(MessageFancy::Sync(message)); + return None; } } @@ -454,7 +481,7 @@ impl Consensus { struct MatchedLocalSolution { final_branch_id: BranchId, - port_mapping: Vec<(ChannelId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } @@ -496,7 +523,7 @@ impl SolutionCombiner { let component_id = solution.component; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, - port_mapping: solution.port_mapping, + channel_mapping: solution.port_mapping, matches: Vec::new(), }; @@ -529,7 +556,7 @@ impl SolutionCombiner { // in the stored solutions which other components are peers of the new // one. if new_component { - let cur_ports = &self.local[component_index].solutions[0].port_mapping; + let cur_ports = &self.local[component_index].solutions[0].channel_mapping; let mut component_peers = Vec::new(); // Find the matching components @@ -539,22 +566,22 @@ impl SolutionCombiner { continue; } - let mut matching_ports = Vec::new(); - for (cur_port_id, _) in cur_ports { - for (other_port_id, _) in &other_component.solutions[0].port_mapping { - if cur_port_id == other_port_id { + let mut matching_channels = Vec::new(); + for (cur_channel_id, _) in cur_ports { + for (other_channel_id, _) in &other_component.solutions[0].channel_mapping { + if cur_channel_id == other_channel_id { // We have a shared port - matching_ports.push(*port_id); + matching_channels.push(*cur_channel_id); } } } - if !matching_ports.is_empty() { + if !matching_channels.is_empty() { // We share some ports component_peers.push(ComponentPeer{ target_id: other_component.component, target_index: other_index, - involved_channels: matching_ports, + involved_channels: matching_channels, }); } } @@ -579,7 +606,7 @@ impl SolutionCombiner { num_ports_in_peers += existing_peer.involved_channels.len(); } - if num_ports_in_peers == other_component.solutions[0].port_mapping.len() { + if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() { other_component.all_peers_present = true; } @@ -609,8 +636,8 @@ impl SolutionCombiner { // Check the port mappings between the pair of solutions. let mut all_matched = true; - 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.port_mapping { - for (other_port, other_branch) in &other_solution.port_mapping { + 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping { + for (other_port, other_branch) in &other_solution.channel_mapping { if cur_port == other_port { if cur_branch == other_branch { // Same port mapping, go to next port @@ -771,29 +798,29 @@ impl SolutionCombiner { // all (all components have their peers), and the exit condition of the // while loop: if we're here, then we have a global solution debug_assert_eq!(check_stack.len(), self.local.len()); - let mut global_solution = Vec::with_capacity(check_stack.len()); + let mut final_branches = Vec::with_capacity(check_stack.len()); for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; - global_solution.push((component.component, solution.final_branch_id)); + final_branches.push((component.component, solution.final_branch_id)); } // Just debugging here, TODO: @remove - let mut total_num_ports = 0; + let mut total_num_channels = 0; for (component_index, _) in check_stack.iter().copied() { let component = &self.local[component_index]; - total_num_ports += component.solutions[0].port_mapping.len(); + total_num_channels += component.solutions[0].channel_mapping.len(); } - total_num_ports /= 2; - let mut final_mapping = Vec::with_capacity(total_num_ports); + total_num_channels /= 2; + let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; - for (channel_id, branch_id) in solution.port_mapping.iter().copied() { + for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { debug_assert_eq!(encountered_branch_id, branch_id); @@ -806,11 +833,11 @@ impl SolutionCombiner { } } - debug_assert_eq!(total_num_checked, total_num_ports); + debug_assert_eq!(total_num_checked, total_num_channels); return Some(GlobalSolution{ - branches: global_solution, - port_mapping: final_mapping, + component_branches: final_branches, + channel_mapping: final_mapping, }); } @@ -842,13 +869,17 @@ impl SolutionCombiner { solutions.push(LocalSolution{ component: component.component, final_branch_id: solution.final_branch_id, - port_mapping: solution.port_mapping, + port_mapping: solution.channel_mapping, }); } } return solutions; } + + fn clear(&mut self) { + self.local.clear(); + } } // -----------------------------------------------------------------------------