diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 4d6f13a148bb75ed3b0566df24120e4c00d69826..b96ff7ac38f93de7a38a605dd8f715a2f711d899 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -23,12 +23,13 @@ struct BranchAnnotation { pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, + sync_round_number: u32, port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { - component_branches: Vec<(ConnectorId, BranchId)>, + component_branches: Vec<(ConnectorId, BranchId, u32)>, channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } @@ -42,6 +43,7 @@ pub enum RoundConclusion { // Consensus // ----------------------------------------------------------------------------- +#[derive(Debug)] struct Peer { id: ConnectorId, encountered_this_round: bool, @@ -290,6 +292,7 @@ impl Consensus { let local_solution = LocalSolution{ component: ctx.id, + sync_round_number: self.sync_round, final_branch_id: branch_id, port_mapping: target_mapping, }; @@ -324,6 +327,8 @@ impl Consensus { peer.encountered_this_round = false; peer.expected_sync_round += 1; } + + println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers) } // --- Handling messages @@ -428,8 +433,8 @@ impl Consensus { SyncCompContent::GlobalSolution(solution) => { // Found a global solution debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader - let (_, branch_id) = solution.component_branches.iter() - .find(|(component_id, _)| *component_id == ctx.id) + let (_, branch_id, _) = solution.component_branches.iter() + .find(|(component_id, _, _)| *component_id == ctx.id) .unwrap(); return Some(RoundConclusion::Success(*branch_id)); }, @@ -739,19 +744,35 @@ impl Consensus { // Handle the global solution let mut my_final_branch_id = BranchId::new_invalid(); - for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() { if connector_id == ctx.id { // This is our solution branch my_final_branch_id = branch_id; continue; } + // Send solution message let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncCompContent::GlobalSolution(global_solution.clone()), }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + + // Update peers as leader. Subsequent call to `end_sync` will update + // the round numbers + match self.peers.iter_mut().find(|v| v.id == connector_id) { + Some(peer) => { + peer.expected_sync_round = sync_round; + }, + None => { + self.peers.push(Peer{ + id: connector_id, + expected_sync_round: sync_round, + encountered_this_round: true, + }); + } + } } debug_assert!(my_final_branch_id.is_valid()); @@ -780,7 +801,9 @@ impl Consensus { }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } - } else if let Some(owner_b) = presence.owner_b { + } + + if let Some(owner_b) = presence.owner_b { if owner_b != ctx.id { if encountered.push(owner_b) { let message = SyncCompMessage{ @@ -907,6 +930,7 @@ struct ComponentPeer { #[derive(Debug, Clone)] struct ComponentLocalSolutions { component: ConnectorId, + sync_round: u32, peers: Vec, solutions: Vec, all_peers_present: bool, @@ -979,6 +1003,7 @@ impl SolutionCombiner { /// of peer connectors. fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { let component_id = solution.component; + let sync_round = solution.sync_round_number; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, channel_mapping: solution.port_mapping, @@ -1002,6 +1027,7 @@ impl SolutionCombiner { let component_index = self.local.len(); self.local.push(ComponentLocalSolutions{ component: component_id, + sync_round, peers: Vec::new(), solutions: vec![solution], all_peers_present: false, @@ -1349,7 +1375,7 @@ impl SolutionCombiner { for entry in &stack { let component = &self.local[entry.component_index]; let solution = &component.solutions[entry.solution_index]; - final_branches.push((component.component, solution.final_branch_id)); + final_branches.push((component.component, solution.final_branch_id, component.sync_round)); } // Just debugging here, TODO: @remove @@ -1442,6 +1468,7 @@ impl SolutionCombiner { for matched in local.solutions { let local_solution = LocalSolution{ component: local.component, + sync_round_number: local.sync_round, final_branch_id: matched.final_branch_id, port_mapping: matched.channel_mapping, };