diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index e30edb59f646a07f21511afe1de906d17b9358ee..c965d64b803233458efe6ae822988749577f809f 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -33,6 +33,12 @@ pub(crate) struct GlobalSolution { // Consensus // ----------------------------------------------------------------------------- +struct Peer { + id: ConnectorId, + encountered_this_round: bool, + expected_sync_round: u32, +} + /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. @@ -50,11 +56,11 @@ pub(crate) struct Consensus { branch_annotations: Vec, last_finished_handled: Option, // Gathered state from communication - encountered_peers: VecSet, // to determine when we should send "found a higher ID" messages. encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, // --- Persistent state - // TODO: Tracking sync round numbers + peers: Vec, + sync_round: u32, // --- Workspaces workspace_ports: Vec, } @@ -71,9 +77,10 @@ impl Consensus { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), last_finished_handled: None, - encountered_peers: VecSet::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), + peers: Vec::new(), + sync_round: 0, workspace_ports: Vec::new(), } } @@ -99,7 +106,6 @@ impl Consensus { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); - debug_assert!(self.encountered_peers.is_empty()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, @@ -206,6 +212,7 @@ impl Consensus { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, + sync_round: self.sync_round }, data_header: DataHeader{ expected_mapping: source_mapping.clone(), @@ -257,9 +264,15 @@ impl Consensus { self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); self.last_finished_handled = None; - self.encountered_peers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); + + self.sync_round += 1; + + for peer in self.peers.iter_mut() { + peer.encountered_this_round = false; + peer.expected_sync_round += 1; + } } // --- Handling messages @@ -318,16 +331,18 @@ impl Consensus { /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) { - self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); - self.handle_received_sync_header(&message.sync_header, ctx); + pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { + self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); + return self.handle_received_sync_header(&message.sync_header, ctx) } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { - self.handle_received_sync_header(&message.sync_header, ctx); + if !self.handle_received_sync_header(&message.sync_header, ctx) { + return None; + } // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); @@ -351,8 +366,8 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) { - debug_assert!(self.branch_can_receive(branch_id, data_header, content)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) { + debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { @@ -379,7 +394,13 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) { + if sync_header.sync_round < peer.expected_sync_round { + return false; + } + } + if let DataContent::SilentPortNotification = content { // No port can receive a "silent" notification. return false; @@ -408,36 +429,38 @@ impl Consensus { /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. - fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { + fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct - if self.branch_can_receive(branch.id, data_header, content) { + if self.branch_can_receive(branch.id, sync_header, data_header, content) { target_ids.push(branch.id); } } } } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) { + fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves - - self.encountered_peers.push(sync_header.sending_component_id); + if !self.handle_peer(sync_header) { + // We can drop this package + return false; + } if sync_header.highest_component_id > self.highest_connector_id { // Sender has higher component ID. So should be the target of our // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; - for encountered_id in self.encountered_peers.iter() { - if *encountered_id == sync_header.sending_component_id { + for peer in self.peers.iter() { + if peer.id == sync_header.sending_component_id || !peer.encountered_this_round { // Don't need to send it to this one continue } let message = SyncMessage { sync_header: self.create_sync_header(ctx), - target_component_id: *encountered_id, + target_component_id: peer.id, content: SyncContent::Notification, }; ctx.submit_message(Message::Sync(message)); @@ -455,6 +478,35 @@ impl Consensus { }; ctx.submit_message(Message::Sync(message)); } // else: exactly equal, so do nothing + + return true; + } + + /// Handles a (potentially new) peer. Returns `false` if the provided sync + /// number is different then the expected one. + fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool { + let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); + match position { + Some(index) => { + let entry = &mut self.peers[index]; + entry.encountered_this_round = true; + // TODO: Proper handling of potential overflow + if sync_header.sync_round >= entry.expected_sync_round { + entry.expected_sync_round = sync_header.sync_round; + return true; + } else { + return false; + } + }, + None => { + self.peers.push(Peer{ + id: sync_header.sending_component_id, + encountered_this_round: true, + expected_sync_round: sync_header.sync_round, + }); + return true; + } + } } fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { @@ -499,6 +551,7 @@ impl Consensus { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, + sync_round: self.sync_round, } } @@ -520,24 +573,30 @@ impl Consensus { // Solution storage and algorithms // ----------------------------------------------------------------------------- +// TODO: Remove all debug derives + +#[derive(Debug)] struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } +#[derive(Debug)] struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } +#[derive(Debug)] struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } +#[derive(Debug)] struct ComponentLocalSolutions { component: ConnectorId, peers: Vec, @@ -550,6 +609,14 @@ pub(crate) struct SolutionCombiner { local: Vec } +struct CheckEntry { + component_index: usize, // component index in combiner's vector + solution_index: usize, // solution entry in the above component entry + parent_entry_index: usize, // parent that caused the creation of this checking entry + match_index_in_parent: usize, // index in the matches array of the parent + solution_index_in_parent: usize,// index in the solution array of the match entry in the parent +} + impl SolutionCombiner { fn new() -> Self { return Self{ @@ -757,99 +824,139 @@ impl SolutionCombiner { /// Checks if, starting at the provided local solution, a global solution /// can be formed. - fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option { + // TODO: At some point, check if divide and conquer is faster? + fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { if !self.can_have_solution() { return None; } - // By now we're certain that all peers are present. So once our - // backtracking solution stack is as long as the number of components, - // then we have found a global solution. - let mut check_stack = Vec::new(); - let mut check_from = 0; - check_stack.push((component_index, solution_index)); - 'checking_loop: while check_from < check_stack.len() { - // Prepare for next iteration - let new_check_from = check_stack.len(); - - // Go through all entries on the checking stack. Each entry - // corresponds to a component's solution. We check that one against - // previously added ones on the stack, and if they're not already - // added we push them onto the check stack. - for check_idx in check_from..new_check_from { - // Take the current solution - let (component_index, solution_index) = check_stack[check_idx]; - debug_assert!(!self.local[component_index].solutions.is_empty()); - let cur_solution = &self.local[component_index].solutions[solution_index]; - - // Go through the matches and check if they're on the stack or - // should be added to the stack. - for cur_match in &cur_solution.matches { - let mut is_already_on_stack = false; - let mut has_same_solution = false; - for existing_check_idx in 0..check_from { - let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx]; - if existing_component_index == cur_match.target_index { - // Already lives on the stack, so the match MUST - // contain the same solution index if the checked - // local solution is agreeable with the (partially - // determined) global solution. - is_already_on_stack = true; - if cur_match.match_indices.contains(&existing_solution_index) { - has_same_solution = true; - break; - } + // Construct initial entry on stack + let mut stack = Vec::with_capacity(self.local.len()); + stack.push(CheckEntry{ + component_index: initial_component_index, + solution_index: initial_solution_index, + parent_entry_index: 0, + match_index_in_parent: 0, + solution_index_in_parent: 0, + }); + + 'check_last_stack: loop { + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + // Check if the current component is matching with all other entries + let mut all_match = true; + 'check_against_existing: for prev_index in 0..cur_index { + let prev_entry = &stack[prev_index]; + let prev_component = &self.local[prev_entry.component_index]; + let prev_solution = &prev_component.solutions[prev_entry.solution_index]; + + for prev_matching_component in &prev_solution.matches { + if prev_matching_component.target_index == cur_entry.component_index { + // Previous entry has shared ports with the current + // entry, so see if we have a composable pair of + // solutions. + if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) { + all_match = false; + break 'check_against_existing; } } + } + } - if is_already_on_stack { - if !has_same_solution { - // We have an inconsistency, so we need to go back - // in our stack, and try the next solution - let (last_component_index, last_solution_index) = check_stack[check_from]; - check_stack.truncate(check_from); - if check_stack.is_empty() { - // The starting point does not yield a valid - // solution - return None; - } + if all_match { + // All components matched until now. + if stack.len() == self.local.len() { + // We have found a global solution + break 'check_last_stack; + } - // Try the next one - let last_component = &self.local[last_component_index]; - let new_solution_index = last_solution_index + 1; - if new_solution_index >= last_component.solutions.len() { - // No more things to try, again: no valid - // solution - return None; - } + // Not all components found yet, look for a new one that has not + // yet been added yet. + for (parent_index, parent_entry) in stack.iter().enumerate() { + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; - check_stack.push((last_component_index, new_solution_index)); - continue 'checking_loop; - } // else: we're fine, the solution is agreeable - } else { - check_stack.push((cur_match.target_index, 0)) + for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() { + if peer_component.match_indices.is_empty() { + continue; + } + + let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index); + if !already_added { + // New component to try + stack.push(CheckEntry{ + component_index: peer_component.target_index, + solution_index: peer_component.match_indices[0], + parent_entry_index: parent_index, + match_index_in_parent: peer_index, + solution_index_in_parent: 0, + }); + continue 'check_last_stack; + } } } + + // Cannot find a peer to add. This is possible if, for example, + // we have a component A which has the only connection to + // component B. And B has sent a local solution saying it is + // finished, but the last data message has not yet arrived at A. + + // In any case, we just exit the if statement and handle not + // being able to find a new connector as being forced to try a + // new permutation of possible local solutions. + } + + // Either the currently considered local solution is inconsistent + // with other local solutions, or we cannot find a new component to + // add. This is where we perform backtracking as long as needed to + // try a new solution. + while stack.len() > 1 { + // Check if our parent has another solution we can try + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + let parent_entry = &stack[cur_entry.parent_entry_index]; + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; + + let match_component = &parent_solution.matches[cur_entry.match_index_in_parent]; + debug_assert!(match_component.target_index == cur_entry.component_index); + let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1; + + if new_solution_index_in_parent < match_component.match_indices.len() { + // We can still try a new one + let new_solution_index = match_component.match_indices[new_solution_index_in_parent]; + let cur_entry = &mut stack[cur_index]; + cur_entry.solution_index_in_parent = new_solution_index_in_parent; + cur_entry.solution_index = new_solution_index; + continue 'check_last_stack; + } else { + // We're out of options here. So pop an entry, then in + // the next iteration of this backtracking loop we try + // to increment that solution + stack.pop(); + } } - check_from = new_check_from; + // Stack length is 1, hence we're back at our initial solution. + // Since that doesn't yield a global solution, we simply: + return None; } - // Because of our earlier checking if we can have a solution at - // all (all components have their peers), and the exit condition of the - // while loop: if we're here, then we have a global solution - debug_assert_eq!(check_stack.len(), self.local.len()); - let mut final_branches = Vec::with_capacity(check_stack.len()); - for (component_index, solution_index) in check_stack.iter().copied() { - let component = &self.local[component_index]; - let solution = &component.solutions[solution_index]; + // Constructing the representation of the global solution + debug_assert_eq!(stack.len(), self.local.len()); + let mut final_branches = Vec::with_capacity(stack.len()); + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; final_branches.push((component.component, solution.final_branch_id)); } // Just debugging here, TODO: @remove let mut total_num_channels = 0; - for (component_index, _) in check_stack.iter().copied() { - let component = &self.local[component_index]; + for entry in &stack { + let component = &self.local[entry.component_index]; total_num_channels += component.solutions[0].channel_mapping.len(); } @@ -857,9 +964,9 @@ impl SolutionCombiner { let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; - for (component_index, solution_index) in check_stack.iter().copied() { - let component = &self.local[component_index]; - let solution = &component.solutions[solution_index]; + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) {