diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 3a62fb5bb951d3403540babf71b58aba16c625a0..234cf968bd19d1c801185f1b1303a503a2af521d 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,4 +1,5 @@ use std::path::Component; +use std::str::pattern::Pattern; use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; @@ -19,6 +20,7 @@ pub(crate) struct LocalSolution { port_mapping: Vec<(ChannelId, BranchId)>, } +#[derive(Clone)] pub(crate) struct GlobalSolution { branches: Vec<(ConnectorId, BranchId)>, port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info @@ -204,11 +206,14 @@ impl Consensus { debug_assert!(self.is_in_sync()); // TODO: Handle sending and receiving ports + // Set final ports final_ports.clear(); let branch = &self.branch_annotations[branch_id.index as usize]; for port in &branch.port_mapping { final_ports.push(port.port_id); } + + // Clear out internal storage } // --- Handling messages @@ -297,6 +302,54 @@ impl Consensus { } } + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { + debug_assert!(self.branch_can_receive(branch_id, data_header)); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.port_mapping { + if mapping.port_id == data_header.target_port { + // Found the port in which the message should be inserted + mapping.registered_id = Some(data_header.new_mapping); + + // Check for sent ports + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle received ports"); + self.workspace_ports.clear(); + } + + return; + } + } + + // If here, then the branch didn't actually own the port? Means the + // caller made a mistake + unreachable!("incorrect notify_of_received_message"); + } + + /// Matches the mapping between the branch and the data message. If they + /// match then the branch can receive the message. + pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + let annotation = &self.branch_annotations[branch_id.index as usize]; + for expected in &data_header.expected_mapping { + // If we own the port, then we have an entry in the + // annotation, check if the current mapping matches + for current in &annotation.port_mapping { + if expected.port_id == current.port_id { + if expected.registered_id != current.registered_id { + // IDs do not match, we cannot receive the + // message in this branch + return false; + } + } + } + } + + return true; + } + + // --- Internal helpers + /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. @@ -349,58 +402,18 @@ impl Consensus { } // else: exactly equal, so do nothing } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { - debug_assert!(self.branch_can_receive(branch_id, data_header)); - let branch = &mut self.branch_annotations[branch_id.index as usize]; - for mapping in &mut branch.port_mapping { - if mapping.port_id == data_header.target_port { - // Found the port in which the message should be inserted - mapping.registered_id = Some(data_header.new_mapping); - - // Check for sent ports - debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content, &mut self.workspace_ports); - if !self.workspace_ports.is_empty() { - todo!("handle received ports"); - self.workspace_ports.clear(); - } - - return; - } - } - - // If here, then the branch didn't actually own the port? Means the - // caller made a mistake - unreachable!("incorrect notify_of_received_message"); - } - - /// Matches the mapping between the branch and the data message. If they - /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { - let annotation = &self.branch_annotations[branch_id.index as usize]; - for expected in &data_header.expected_mapping { - // If we own the port, then we have an entry in the - // annotation, check if the current mapping matches - for current in &annotation.port_mapping { - if expected.port_id == current.port_id { - if expected.registered_id != current.registered_id { - // IDs do not match, we cannot receive the - // message in this branch - return false; - } - } - } - } - - return true; - } - - // --- Internal helpers fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { - + for (connector_id, _) in global_solution.branches.iter().copied() { + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: connector_id, + content: SyncContent::GlobalSolution(global_solution.clone()), + }; + ctx.submit_message(MessageFancy::Sync(message)); + } } } else { // Someone else is the leader @@ -479,7 +492,7 @@ impl SolutionCombiner { /// Adds a new local solution to the global solution storage. Will check the /// new local solutions for matching against already stored local solutions /// of peer connectors. - fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option> { + fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { let component_id = solution.component; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, @@ -676,7 +689,7 @@ impl SolutionCombiner { /// Checks if, starting at the provided local solution, a global solution /// can be formed. - fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option> { + fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option { if !self.can_have_solution() { return None; } @@ -755,17 +768,50 @@ impl SolutionCombiner { } // Because of our earlier checking if we can have a solution at - // all (all components have a peer), and the exit condition of the while - // loop: if we're here, then we have a global solution + // all (all components have their peers), and the exit condition of the + // while loop: if we're here, then we have a global solution debug_assert_eq!(check_stack.len(), self.local.len()); let mut global_solution = Vec::with_capacity(check_stack.len()); - for (component_index, solution_index) in check_stack { + for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; global_solution.push((component.component, solution.final_branch_id)); } - return Some(global_solution); + // Just debugging here, TODO: @remove + let mut total_num_ports = 0; + for (component_index, _) in check_stack.iter().copied() { + let component = &self.local[component_index]; + total_num_ports += component.solutions[0].port_mapping.len(); + } + + total_num_ports /= 2; + let mut final_mapping = Vec::with_capacity(total_num_ports); + let mut total_num_checked = 0; + + for (component_index, solution_index) in check_stack.iter().copied() { + let component = &self.local[component_index]; + let solution = &component.solutions[solution_index]; + + for (channel_id, branch_id) in solution.port_mapping.iter().copied() { + match final_mapping.iter().find(|(v, _)| *v == channel_id) { + Some((_, encountered_branch_id)) => { + debug_assert_eq!(encountered_branch_id, branch_id); + total_num_checked += 1; + }, + None => { + final_mapping.push((channel_id, branch_id)); + } + } + } + } + + debug_assert_eq!(total_num_checked, total_num_ports); + + return Some(GlobalSolution{ + branches: global_solution, + port_mapping: final_mapping, + }); } /// Simple test if a solution is at all possible. If this returns true it