use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ Message, PortAnnotation, DataMessage, DataContent, DataHeader, SyncMessage, SyncContent, SyncHeader, }; use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, } #[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, port_mapping: Vec<(ChannelId, BranchId)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- // Consensus // ----------------------------------------------------------------------------- /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. /// /// The type itself serves as an experiment to see how code should be organized. // TODO: Flatten all datastructures // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, last_finished_handled: Option, // Gathered state from communication encountered_peers: VecSet, // to determine when we should send "found a higher ID" messages. encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, // --- Persistent state // TODO: Tracking sync round numbers // --- Workspaces workspace_ports: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Consistency { Valid, Inconsistent, } impl Consensus { pub fn new() -> Self { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), workspace_ports: Vec::new(), } } // --- Controlling sync round and branches /// Returns whether the consensus algorithm is running in sync mode pub fn is_in_sync(&self) -> bool { return !self.branch_annotations.is_empty(); } /// TODO: Remove this once multi-fire is in place pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation { let branch = &self.branch_annotations[branch_id.index as usize]; let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap(); return port; } /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.encountered_peers.is_empty()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ port_mapping: ctx.get_ports().iter() .map(|v| PortAnnotation{ port_id: v.self_id, registered_id: None, expected_firing: None, }) .collect(), }); self.highest_connector_id = ctx.id; } /// Notifies the consensus algorithm that a new branch has appeared. Must be /// called for each forked branch in the execution tree. pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { // If called correctly. Then each time we are notified the new branch's // index is the length in `branch_annotations`. debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; let new_branch_annotations = BranchAnnotation{ port_mapping: parent_branch_annotations.port_mapping.clone(), }; self.branch_annotations.push(new_branch_annotations); } /// Notifies the consensus algorithm that a branch has reached the end of /// the sync block. A final check for consistency will be performed that the /// caller has to handle. Note that pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; for mapping in &branch.port_mapping { match mapping.expected_firing { Some(expected) => { if expected != mapping.registered_id.is_some() { // Inconsistent speculative state and actual state debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier return Consistency::Inconsistent; } }, None => {}, } } return Consistency::Valid; } /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == port_id { match mapping.expected_firing { None => { // Not yet mapped, perform speculative mapping mapping.expected_firing = Some(does_fire); return Consistency::Valid; }, Some(current) => { // Already mapped if current == does_fire { return Consistency::Valid; } else { return Consistency::Inconsistent; } } } } } unreachable!("notify_of_speculative_mapping called with unowned port"); } /// Generates sync messages for any branches that are at the end of the /// sync block. To find these branches, they should've been put in the /// "finished" queue in the execution tree. pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); let mut last_branch_id = self.last_finished_handled; for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { // Turn the port mapping into a local solution let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { // Note: if the port is silent, and we've never communicated // over the port, then we need to do so now, to let the peer // component know about our sync leader state. let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); let peer_port_id = port_desc.peer_id; let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&port.port_id) { ctx.submit_message(Message::Data(DataMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, }, data_header: DataHeader{ expected_mapping: source_mapping.clone(), sending_port: port.port_id, target_port: peer_port_id, new_mapping: BranchId::new_invalid(), }, content: DataContent::SilentPortNotification, })); self.encountered_ports.push(port.port_id); } target_mapping.push(( channel_id, port.registered_id.unwrap_or(BranchId::new_invalid()) )); } let local_solution = LocalSolution{ component: ctx.id, final_branch_id: branch.id, port_mapping: target_mapping, }; let solution_branch = self.send_or_store_local_solution(local_solution, ctx); if solution_branch.is_some() { // No need to continue iterating, we've found the solution return solution_branch; } last_branch_id = Some(branch.id); } self.last_finished_handled = last_branch_id; return None; } pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); // TODO: Handle sending and receiving ports // Set final ports final_ports.clear(); let branch = &self.branch_annotations[branch_id.index as usize]; for port in &branch.port_mapping { final_ports.push(port.port_id); } // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); self.last_finished_handled = None; self.encountered_peers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); } // --- Handling messages /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; if cfg!(debug_assertions) { // Check for consistent mapping let port = branch.port_mapping.iter() .find(|v| v.port_id == source_port_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } // Check for ports that are being sent debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle sending ports"); self.workspace_ports.clear(); } // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch_id }; // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); mapping.registered_id = Some(branch_id); } } self.encountered_ports.push(source_port_id); return (self.create_sync_header(ctx), data_header); } /// Handles a new data message by handling the data and sync header, and /// checking which *existing* branches *can* receive the message. So two /// cautionary notes: /// 1. A future branch might also be able to receive this message, see the /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) { self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); self.handle_received_sync_header(&message.sync_header, ctx); } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { self.handle_received_sync_header(&message.sync_header, ctx); // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); match message.content { SyncContent::Notification => { // We were just interested in the header return None; }, SyncContent::LocalSolution(solution) => { // We might be the leader, or earlier messages caused us to not // be the leader anymore. return self.send_or_store_local_solution(solution, ctx); }, SyncContent::GlobalSolution(solution) => { // Take branch of interest and return it. let (_, branch_id) = solution.component_branches.iter() .find(|(connector_id, _)| *connector_id == ctx.id) .unwrap(); return Some(*branch_id); } } } pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) { debug_assert!(self.branch_can_receive(branch_id, data_header, content)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == data_header.target_port { // Found the port in which the message should be inserted mapping.registered_id = Some(data_header.new_mapping); // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); } return; } } // If here, then the branch didn't actually own the port? Means the // caller made a mistake unreachable!("incorrect notify_of_received_message"); } /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool { if let DataContent::SilentPortNotification = content { // No port can receive a "silent" notification. return false; } let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches for current in &annotation.port_mapping { if expected.port_id == current.port_id { if expected.registered_id != current.registered_id { // IDs do not match, we cannot receive the // message in this branch return false; } } } } return true; } // --- Internal helpers /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct if self.branch_can_receive(branch.id, data_header, content) { target_ids.push(branch.id); } } } } fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves self.encountered_peers.push(sync_header.sending_component_id); if sync_header.highest_component_id > self.highest_connector_id { // Sender has higher component ID. So should be the target of our // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; for encountered_id in self.encountered_peers.iter() { if *encountered_id == sync_header.sending_component_id { // Don't need to send it to this one continue } let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: *encountered_id, content: SyncContent::Notification, }; ctx.submit_message(Message::Sync(message)); } // But also send our locally combined solution self.forward_local_solutions(ctx); } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, content: SyncContent::Notification }; ctx.submit_message(Message::Sync(message)); } // else: exactly equal, so do nothing } fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index); if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { let mut my_final_branch_id = BranchId::new_invalid(); for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { if connector_id == ctx.id { // This is our solution branch my_final_branch_id = branch_id; continue; } let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncContent::GlobalSolution(global_solution.clone()), }; ctx.submit_message(Message::Sync(message)); } debug_assert!(my_final_branch_id.is_valid()); return Some(my_final_branch_id); } else { return None; } } else { // Someone else is the leader let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(solution), }; ctx.submit_message(Message::Sync(message)); return None; } } #[inline] fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, } } fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); for local_solution in self.solution_combiner.drain() { let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(local_solution), }; ctx.submit_message(Message::Sync(message)); } } } // ----------------------------------------------------------------------------- // Solution storage and algorithms // ----------------------------------------------------------------------------- struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } struct ComponentLocalSolutions { component: ConnectorId, peers: Vec, solutions: Vec, all_peers_present: bool, } // TODO: Flatten? Flatten. Flatten everything. pub(crate) struct SolutionCombiner { local: Vec } impl SolutionCombiner { fn new() -> Self { return Self{ local: Vec::new(), }; } /// Adds a new local solution to the global solution storage. Will check the /// new local solutions for matching against already stored local solutions /// of peer connectors. fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { let component_id = solution.component; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, channel_mapping: solution.port_mapping, matches: Vec::new(), }; // Create an entry for the solution for the particular component let component_exists = self.local.iter_mut() .enumerate() .find(|(_, v)| v.component == component_id); let (component_index, solution_index, new_component) = match component_exists { Some((component_index, storage)) => { // Entry for component exists, so add to solutions let solution_index = storage.solutions.len(); storage.solutions.push(solution); (component_index, solution_index, false) } None => { // Entry for component does not exist yet let component_index = self.local.len(); self.local.push(ComponentLocalSolutions{ component: component_id, peers: Vec::new(), solutions: vec![solution], all_peers_present: false, }); (component_index, 0, true) } }; // If this is a solution of a component that is new to us, then we check // in the stored solutions which other components are peers of the new // one. if new_component { let cur_ports = &self.local[component_index].solutions[0].channel_mapping; let mut component_peers = Vec::new(); // Find the matching components for (other_index, other_component) in self.local.iter().enumerate() { if other_index == component_index { // Don't match against ourselves continue; } let mut matching_channels = Vec::new(); for (cur_channel_id, _) in cur_ports { for (other_channel_id, _) in &other_component.solutions[0].channel_mapping { if cur_channel_id == other_channel_id { // We have a shared port matching_channels.push(*cur_channel_id); } } } if !matching_channels.is_empty() { // We share some ports component_peers.push(ComponentPeer{ target_id: other_component.component, target_index: other_index, involved_channels: matching_channels, }); } } let mut num_ports_in_peers = 0; for peer in &component_peers { num_ports_in_peers += peer.involved_channels.len(); } if num_ports_in_peers == cur_ports.len() { // Newly added component has all required peers present self.local[component_index].all_peers_present = true; } // Add the found component pairing entries to the solution entries // for the two involved components for component_match in component_peers { // Check the other component for having all peers present let mut num_ports_in_peers = component_match.involved_channels.len(); let other_component = &mut self.local[component_match.target_index]; for existing_peer in &other_component.peers { num_ports_in_peers += existing_peer.involved_channels.len(); } if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() { other_component.all_peers_present = true; } other_component.peers.push(ComponentPeer{ target_id: component_id, target_index: component_index, involved_channels: component_match.involved_channels.clone(), }); let new_component = &mut self.local[component_index]; new_component.peers.push(component_match); } } // We're now sure that we know which other components the currently // considered component is linked up to. Now we need to check those // entries (if any) to see if any pair of local solutions match let mut new_component_matches = Vec::new(); let cur_component = &self.local[component_index]; let cur_solution = &cur_component.solutions[solution_index]; for peer in &cur_component.peers { let mut new_solution_matches = Vec::new(); let other_component = &self.local[peer.target_index]; for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() { // Check the port mappings between the pair of solutions. let mut all_matched = true; 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping { for (other_port, other_branch) in &other_solution.channel_mapping { if cur_port == other_port { if cur_branch == other_branch { // Same port mapping, go to next port break; } else { // Different port mapping, not a match all_matched = false; break 'mapping_check_loop; } } } } if !all_matched { continue; } // Port mapping between the component pair is the same, so they // have agreeable local solutions new_solution_matches.push(other_solution_index); } new_component_matches.push(ComponentMatches{ target_id: peer.target_id, target_index: peer.target_index, match_indices: new_solution_matches, }); } // And now that we have the new solution-to-solution matches, we need to // add those in the appropriate storage. for new_component_match in new_component_matches { let other_component = &mut self.local[new_component_match.target_index]; for other_solution_index in new_component_match.match_indices.iter().copied() { let other_solution = &mut other_component.solutions[other_solution_index]; // Add a completely new entry for the component, or add it to // the existing component entry's matches match other_solution.matches.iter_mut() .find(|v| v.target_id == component_id) { Some(other_match) => { other_match.match_indices.push(solution_index); }, None => { other_solution.matches.push(ComponentMatches{ target_id: component_id, target_index: component_index, match_indices: vec![solution_index], }) } } } let cur_component = &mut self.local[component_index]; let cur_solution = &mut cur_component.solutions[solution_index]; match cur_solution.matches.iter_mut() .find(|v| v.target_id == new_component_match.target_id) { Some(other_match) => { // Already have an entry debug_assert_eq!(other_match.target_index, new_component_match.target_index); other_match.match_indices.extend(&new_component_match.match_indices); }, None => { // Create a new entry cur_solution.matches.push(new_component_match); } } } return self.check_new_solution(component_index, solution_index); } /// Checks if, starting at the provided local solution, a global solution /// can be formed. fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option { if !self.can_have_solution() { return None; } // By now we're certain that all peers are present. So once our // backtracking solution stack is as long as the number of components, // then we have found a global solution. let mut check_stack = Vec::new(); let mut check_from = 0; check_stack.push((component_index, solution_index)); 'checking_loop: while check_from < check_stack.len() { // Prepare for next iteration let new_check_from = check_stack.len(); // Go through all entries on the checking stack. Each entry // corresponds to a component's solution. We check that one against // previously added ones on the stack, and if they're not already // added we push them onto the check stack. for check_idx in check_from..new_check_from { // Take the current solution let (component_index, solution_index) = check_stack[check_idx]; debug_assert!(!self.local[component_index].solutions.is_empty()); let cur_solution = &self.local[component_index].solutions[solution_index]; // Go through the matches and check if they're on the stack or // should be added to the stack. for cur_match in &cur_solution.matches { let mut is_already_on_stack = false; let mut has_same_solution = false; for existing_check_idx in 0..check_from { let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx]; if existing_component_index == cur_match.target_index { // Already lives on the stack, so the match MUST // contain the same solution index if the checked // local solution is agreeable with the (partially // determined) global solution. is_already_on_stack = true; if cur_match.match_indices.contains(&existing_solution_index) { has_same_solution = true; break; } } } if is_already_on_stack { if !has_same_solution { // We have an inconsistency, so we need to go back // in our stack, and try the next solution let (last_component_index, last_solution_index) = check_stack[check_from]; check_stack.truncate(check_from); if check_stack.is_empty() { // The starting point does not yield a valid // solution return None; } // Try the next one let last_component = &self.local[last_component_index]; let new_solution_index = last_solution_index + 1; if new_solution_index >= last_component.solutions.len() { // No more things to try, again: no valid // solution return None; } check_stack.push((last_component_index, new_solution_index)); continue 'checking_loop; } // else: we're fine, the solution is agreeable } else { check_stack.push((cur_match.target_index, 0)) } } } check_from = new_check_from; } // Because of our earlier checking if we can have a solution at // all (all components have their peers), and the exit condition of the // while loop: if we're here, then we have a global solution debug_assert_eq!(check_stack.len(), self.local.len()); let mut final_branches = Vec::with_capacity(check_stack.len()); for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; final_branches.push((component.component, solution.final_branch_id)); } // Just debugging here, TODO: @remove let mut total_num_channels = 0; for (component_index, _) in check_stack.iter().copied() { let component = &self.local[component_index]; total_num_channels += component.solutions[0].channel_mapping.len(); } total_num_channels /= 2; let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { debug_assert_eq!(*encountered_branch_id, branch_id); total_num_checked += 1; }, None => { final_mapping.push((channel_id, branch_id)); } } } } debug_assert_eq!(total_num_checked, total_num_channels); return Some(GlobalSolution{ component_branches: final_branches, channel_mapping: final_mapping, }); } /// Simple test if a solution is at all possible. If this returns true it /// does not mean there actually is a solution. fn can_have_solution(&self) -> bool { for component in &self.local { if !component.all_peers_present { return false; } } return true; } /// Turns the entire (partially resolved) global solution back into local /// solutions to ship to another component. // TODO: Don't do this, kind of wasteful since a lot of processing has // already been performed. fn drain(&mut self) -> Vec { let mut reserve_len = 0; for component in &self.local { reserve_len += component.solutions.len(); } let mut solutions = Vec::with_capacity(reserve_len); for component in self.local.drain(..) { for solution in component.solutions { solutions.push(LocalSolution{ component: component.component, final_branch_id: solution.final_branch_id, port_mapping: solution.channel_mapping, }); } } return solutions; } fn clear(&mut self) { self.local.clear(); } } // ----------------------------------------------------------------------------- // Generic Helpers // ----------------------------------------------------------------------------- /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. use crate::protocol::eval::Value; fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortIdLocal::new(port_id.0.u32_suffix); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } }