diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 6874153ced9b2acecb22a54535cbe6f60b427c0d..3a62fb5bb951d3403540babf71b58aba16c625a0 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -3,9 +3,9 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; -use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; +use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; use crate::runtime2::inbox::SyncMessage; -use crate::runtime2::port::{Port, PortIdLocal}; +use crate::runtime2::port::{ChannelId, Port, PortIdLocal}; use crate::runtime2::scheduler::ComponentCtxFancy; use super::inbox2::PortAnnotation; @@ -16,7 +16,12 @@ struct BranchAnnotation { pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, - port_mapping: Vec<(PortIdLocal, BranchId)>, + port_mapping: Vec<(ChannelId, BranchId)>, +} + +pub(crate) struct GlobalSolution { + branches: Vec<(ConnectorId, BranchId)>, + port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -41,7 +46,7 @@ pub(crate) struct Consensus { // Gathered state (in case we are currently the leader of the distributed // consensus protocol) encountered_peers: VecSet, - local_solutions: Vec, + solution_combiner: SolutionCombiner, // Workspaces workspace_ports: Vec, } @@ -59,7 +64,7 @@ impl Consensus { branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), - local_solutions: Vec::new(), + solution_combiner: SolutionCombiner::new(), workspace_ports: Vec::new(), } } @@ -175,8 +180,9 @@ impl Consensus { let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { + let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); target_mapping.push(( - port.port_id, + port_desc.channel_id, port.registered_id.unwrap_or(BranchId::new_invalid()) )); } @@ -186,7 +192,7 @@ impl Consensus { final_branch_id: branch.id, port_mapping: target_mapping, }; - + self.send_or_store_local_solution(local_solution, ctx); last_branch_id = Some(branch.id); } @@ -250,7 +256,63 @@ impl Consensus { return (sync_header, data_header); } - pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { + /// Handles a new data message by handling the data and sync header, and + /// checking which *existing* branches *can* receive the message. So two + /// cautionary notes: + /// 1. A future branch might also be able to receive this message, see the + /// `branch_can_receive` function. + /// 2. We return the branches that *can* receive the message, you still + /// have to explicitly call `notify_of_received_message`. + pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec) { + self.handle_received_data_header(exec_tree, &message.data_header, target_ids); + self.handle_received_sync_header(&message.sync_header, ctx); + } + + /// Handles a new sync message by handling the sync header and the contents + /// of the message. Returns `Some` with the branch ID of the global solution + /// if the sync solution has been found. + pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) -> Option { + self.handle_received_sync_header(&message.sync_header, ctx); + + // And handle the contents + debug_assert_eq!(message.target_component_id, ctx.id); + match message.content { + SyncContent::Notification => { + // We were just interested in the header + return None; + }, + SyncContent::LocalSolution(solution) => { + // We might be the leader, or earlier messages caused us to not + // be the leader anymore. + self.send_or_store_local_solution(solution, ctx); + return None; + }, + SyncContent::GlobalSolution(solution) => { + // Take branch of interest and return it. + let (_, branch_id) = solution.branches.iter() + .find(|(connector_id, _)| connector_id == ctx.id) + .unwrap(); + return Some(*branch_id); + } + } + } + + /// Checks data header and consults the stored port mapping and the + /// execution tree to see which branches may receive the data message's + /// contents. + fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { + for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { + if branch.awaiting_port == data_header.target_port { + // Found a branch awaiting the message, but we need to make sure + // the mapping is correct + if self.branch_can_receive(branch.id, data_header) { + target_ids.push(branch.id); + } + } + } + } + + fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves self.encountered_peers.push(sync_header.sending_component_id); @@ -287,25 +349,6 @@ impl Consensus { } // else: exactly equal, so do nothing } - /// Checks data header and consults the stored port mapping and the - /// execution tree to see which branches may receive the data message's - /// contents. - /// - /// This function is generally called for freshly received messages that - /// should be matched against previously halted branches. - /// TODO: Rename, name confused me after a day - pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { - if branch.awaiting_port == data_header.target_port { - // Found a branch awaiting the message, but we need to make sure - // the mapping is correct - if self.branch_can_receive(branch.id, data_header) { - target_ids.push(branch.id); - } - } - } - } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { debug_assert!(self.branch_can_receive(branch_id, data_header)); let branch = &mut self.branch_annotations[branch_id.index as usize]; @@ -356,7 +399,9 @@ impl Consensus { fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { if self.highest_connector_id == ctx.id { // We are the leader - self.store_local_solution(solution, ctx); + if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { + + } } else { // Someone else is the leader let message = SyncMessageFancy{ @@ -368,14 +413,6 @@ impl Consensus { } } - /// Stores the local solution internally. This assumes that we are the - /// leader. - fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) { - debug_assert_eq!(self.highest_connector_id, _ctx.id); - - self.local_solutions.push(solution); - } - #[inline] fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader { return SyncHeader{ @@ -387,15 +424,13 @@ impl Consensus { fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) { debug_assert_ne!(self.highest_connector_id, ctx.id); - if !self.local_solutions.is_empty() { - for local_solution in self.local_solutions.drain(..) { - let message = SyncMessageFancy{ - sync_header: self.create_sync_header(ctx), - target_component_id: self.highest_connector_id, - content: SyncContent::LocalSolution(local_solution), - }; - ctx.submit_message(MessageFancy::Sync(message)); - } + for local_solution in self.solution_combiner.drain() { + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncContent::LocalSolution(local_solution), + }; + ctx.submit_message(MessageFancy::Sync(message)); } } } @@ -406,7 +441,7 @@ impl Consensus { struct MatchedLocalSolution { final_branch_id: BranchId, - port_mapping: Vec<(PortIdLocal, BranchId)>, + port_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } @@ -419,7 +454,7 @@ struct ComponentMatches { struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components - involved_ports: Vec, + involved_channels: Vec, } struct ComponentLocalSolutions { @@ -430,11 +465,11 @@ struct ComponentLocalSolutions { } // TODO: Flatten? Flatten. Flatten everything. -pub(crate) struct GlobalSolution { +pub(crate) struct SolutionCombiner { local: Vec } -impl GlobalSolution { +impl SolutionCombiner { fn new() -> Self { return Self{ local: Vec::new(), @@ -444,7 +479,7 @@ impl GlobalSolution { /// Adds a new local solution to the global solution storage. Will check the /// new local solutions for matching against already stored local solutions /// of peer connectors. - fn add_solution(&mut self, solution: LocalSolution) { + fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option> { let component_id = solution.component; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, @@ -506,14 +541,14 @@ impl GlobalSolution { component_peers.push(ComponentPeer{ target_id: other_component.component, target_index: other_index, - involved_ports: matching_ports, + involved_channels: matching_ports, }); } } let mut num_ports_in_peers = 0; for peer in component_peers { - num_ports_in_peers += peer.involved_ports.len(); + num_ports_in_peers += peer.involved_channels.len(); } if num_ports_in_peers == cur_ports.len() { @@ -525,10 +560,10 @@ impl GlobalSolution { // for the two involved components for component_match in component_peers { // Check the other component for having all peers present - let mut num_ports_in_peers = component_match.involved_ports.len(); + let mut num_ports_in_peers = component_match.involved_channels.len(); let other_component = &mut self.local[component_match.target_index]; for existing_peer in &other_component.peers { - num_ports_in_peers += existing_peer.involved_ports.len(); + num_ports_in_peers += existing_peer.involved_channels.len(); } if num_ports_in_peers == other_component.solutions[0].port_mapping.len() { @@ -538,7 +573,7 @@ impl GlobalSolution { other_component.peers.push(ComponentPeer{ target_id: component_id, target_index: component_index, - involved_ports: component_match.involved_ports.clone(), + involved_channels: component_match.involved_channels.clone(), }); let new_component = &mut self.local[component_index]; @@ -635,12 +670,102 @@ impl GlobalSolution { } } } + + return self.check_new_solution(component_index, solution_index); } /// Checks if, starting at the provided local solution, a global solution /// can be formed. - fn check_new_solution(&self, component_idx: usize, solution_index: usize) -> Option> { - + fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option> { + if !self.can_have_solution() { + return None; + } + + // By now we're certain that all peers are present. So once our + // backtracking solution stack is as long as the number of components, + // then we have found a global solution. + let mut check_stack = Vec::new(); + let mut check_from = 0; + check_stack.push((component_index, solution_index)); + 'checking_loop: while check_from < check_stack.len() { + // Prepare for next iteration + let new_check_from = check_stack.len(); + + // Go through all entries on the checking stack. Each entry + // corresponds to a component's solution. We check that one against + // previously added ones on the stack, and if they're not already + // added we push them onto the check stack. + for check_idx in check_from..new_check_from { + // Take the current solution + let (component_index, solution_index) = check_stack[check_idx]; + debug_assert!(!self.local[component_index].solutions.is_empty()); + let cur_solution = &self.local[component_index].solutions[solution_index]; + + // Go through the matches and check if they're on the stack or + // should be added to the stack. + for cur_match in &cur_solution.matches { + let mut is_already_on_stack = false; + let mut has_same_solution = false; + for existing_check_idx in 0..check_from { + let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx]; + if existing_component_index == cur_match.target_index { + // Already lives on the stack, so the match MUST + // contain the same solution index if the checked + // local solution is agreeable with the (partially + // determined) global solution. + is_already_on_stack = true; + if cur_match.match_indices.contains(&existing_solution_index) { + has_same_solution = true; + break; + } + } + } + + if is_already_on_stack { + if !has_same_solution { + // We have an inconsistency, so we need to go back + // in our stack, and try the next solution + let (last_component_index, last_solution_index) = check_stack[check_from]; + check_stack.truncate(check_from); + if check_stack.is_empty() { + // The starting point does not yield a valid + // solution + return None; + } + + // Try the next one + let last_component = &self.local[last_component_index]; + let new_solution_index = last_solution_index + 1; + if new_solution_index >= last_component.solutions.len() { + // No more things to try, again: no valid + // solution + return None; + } + + check_stack.push((last_component_index, new_solution_index)); + continue 'checking_loop; + } // else: we're fine, the solution is agreeable + } else { + check_stack.push((cur_match.target_index, 0)) + } + } + } + + check_from = new_check_from; + } + + // Because of our earlier checking if we can have a solution at + // all (all components have a peer), and the exit condition of the while + // loop: if we're here, then we have a global solution + debug_assert_eq!(check_stack.len(), self.local.len()); + let mut global_solution = Vec::with_capacity(check_stack.len()); + for (component_index, solution_index) in check_stack { + let component = &self.local[component_index]; + let solution = &component.solutions[solution_index]; + global_solution.push((component.component, solution.final_branch_id)); + } + + return Some(global_solution); } /// Simple test if a solution is at all possible. If this returns true it @@ -654,6 +779,30 @@ impl GlobalSolution { return true; } + + /// Turns the entire (partially resolved) global solution back into local + /// solutions to ship to another component. + // TODO: Don't do this, kind of wasteful since a lot of processing has + // already been performed. + fn drain(&mut self) -> Vec { + let mut reserve_len = 0; + for component in &self.local { + reserve_len += component.solutions.len(); + } + + let mut solutions = Vec::with_capacity(reserve_len); + for component in self.local.drain(..) { + for solution in component.solutions { + solutions.push(LocalSolution{ + component: component.component, + final_branch_id: solution.final_branch_id, + port_mapping: solution.port_mapping, + }); + } + } + + return solutions; + } } // -----------------------------------------------------------------------------