From 7662b8fb871d471f5d580a7de9142d20cb7801ac 2021-11-10 19:22:54 From: mh Date: 2021-11-10 19:22:54 Subject: [PATCH] Rewrite solution composition, add some tests --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index c6e919b4198fcbf0372ed9f3d74f12421fcf010f..943e149661563948bd38169e4e07b8c0ddd74a11 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -145,7 +145,10 @@ impl ConnectorPDL { // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); let mut branches = Vec::new(); // TODO: @Remove - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); + if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + // Old message, so drop it + return; + } for branch_id in branches.drain(..) { // This branch can receive, so fork and given it the message @@ -154,7 +157,7 @@ impl ConnectorPDL { let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -238,7 +241,7 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); @@ -247,7 +250,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_branch_received = true; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index e30edb59f646a07f21511afe1de906d17b9358ee..c965d64b803233458efe6ae822988749577f809f 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -33,6 +33,12 @@ pub(crate) struct GlobalSolution { // Consensus // ----------------------------------------------------------------------------- +struct Peer { + id: ConnectorId, + encountered_this_round: bool, + expected_sync_round: u32, +} + /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. @@ -50,11 +56,11 @@ pub(crate) struct Consensus { branch_annotations: Vec, last_finished_handled: Option, // Gathered state from communication - encountered_peers: VecSet, // to determine when we should send "found a higher ID" messages. encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, // --- Persistent state - // TODO: Tracking sync round numbers + peers: Vec, + sync_round: u32, // --- Workspaces workspace_ports: Vec, } @@ -71,9 +77,10 @@ impl Consensus { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), last_finished_handled: None, - encountered_peers: VecSet::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), + peers: Vec::new(), + sync_round: 0, workspace_ports: Vec::new(), } } @@ -99,7 +106,6 @@ impl Consensus { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); - debug_assert!(self.encountered_peers.is_empty()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, @@ -206,6 +212,7 @@ impl Consensus { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, + sync_round: self.sync_round }, data_header: DataHeader{ expected_mapping: source_mapping.clone(), @@ -257,9 +264,15 @@ impl Consensus { self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); self.last_finished_handled = None; - self.encountered_peers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); + + self.sync_round += 1; + + for peer in self.peers.iter_mut() { + peer.encountered_this_round = false; + peer.expected_sync_round += 1; + } } // --- Handling messages @@ -318,16 +331,18 @@ impl Consensus { /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) { - self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); - self.handle_received_sync_header(&message.sync_header, ctx); + pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { + self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); + return self.handle_received_sync_header(&message.sync_header, ctx) } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { - self.handle_received_sync_header(&message.sync_header, ctx); + if !self.handle_received_sync_header(&message.sync_header, ctx) { + return None; + } // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); @@ -351,8 +366,8 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) { - debug_assert!(self.branch_can_receive(branch_id, data_header, content)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) { + debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { @@ -379,7 +394,13 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) { + if sync_header.sync_round < peer.expected_sync_round { + return false; + } + } + if let DataContent::SilentPortNotification = content { // No port can receive a "silent" notification. return false; @@ -408,36 +429,38 @@ impl Consensus { /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. - fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { + fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct - if self.branch_can_receive(branch.id, data_header, content) { + if self.branch_can_receive(branch.id, sync_header, data_header, content) { target_ids.push(branch.id); } } } } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) { + fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves - - self.encountered_peers.push(sync_header.sending_component_id); + if !self.handle_peer(sync_header) { + // We can drop this package + return false; + } if sync_header.highest_component_id > self.highest_connector_id { // Sender has higher component ID. So should be the target of our // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; - for encountered_id in self.encountered_peers.iter() { - if *encountered_id == sync_header.sending_component_id { + for peer in self.peers.iter() { + if peer.id == sync_header.sending_component_id || !peer.encountered_this_round { // Don't need to send it to this one continue } let message = SyncMessage { sync_header: self.create_sync_header(ctx), - target_component_id: *encountered_id, + target_component_id: peer.id, content: SyncContent::Notification, }; ctx.submit_message(Message::Sync(message)); @@ -455,6 +478,35 @@ impl Consensus { }; ctx.submit_message(Message::Sync(message)); } // else: exactly equal, so do nothing + + return true; + } + + /// Handles a (potentially new) peer. Returns `false` if the provided sync + /// number is different then the expected one. + fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool { + let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); + match position { + Some(index) => { + let entry = &mut self.peers[index]; + entry.encountered_this_round = true; + // TODO: Proper handling of potential overflow + if sync_header.sync_round >= entry.expected_sync_round { + entry.expected_sync_round = sync_header.sync_round; + return true; + } else { + return false; + } + }, + None => { + self.peers.push(Peer{ + id: sync_header.sending_component_id, + encountered_this_round: true, + expected_sync_round: sync_header.sync_round, + }); + return true; + } + } } fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { @@ -499,6 +551,7 @@ impl Consensus { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, + sync_round: self.sync_round, } } @@ -520,24 +573,30 @@ impl Consensus { // Solution storage and algorithms // ----------------------------------------------------------------------------- +// TODO: Remove all debug derives + +#[derive(Debug)] struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } +#[derive(Debug)] struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } +#[derive(Debug)] struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } +#[derive(Debug)] struct ComponentLocalSolutions { component: ConnectorId, peers: Vec, @@ -550,6 +609,14 @@ pub(crate) struct SolutionCombiner { local: Vec } +struct CheckEntry { + component_index: usize, // component index in combiner's vector + solution_index: usize, // solution entry in the above component entry + parent_entry_index: usize, // parent that caused the creation of this checking entry + match_index_in_parent: usize, // index in the matches array of the parent + solution_index_in_parent: usize,// index in the solution array of the match entry in the parent +} + impl SolutionCombiner { fn new() -> Self { return Self{ @@ -757,99 +824,139 @@ impl SolutionCombiner { /// Checks if, starting at the provided local solution, a global solution /// can be formed. - fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option { + // TODO: At some point, check if divide and conquer is faster? + fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { if !self.can_have_solution() { return None; } - // By now we're certain that all peers are present. So once our - // backtracking solution stack is as long as the number of components, - // then we have found a global solution. - let mut check_stack = Vec::new(); - let mut check_from = 0; - check_stack.push((component_index, solution_index)); - 'checking_loop: while check_from < check_stack.len() { - // Prepare for next iteration - let new_check_from = check_stack.len(); - - // Go through all entries on the checking stack. Each entry - // corresponds to a component's solution. We check that one against - // previously added ones on the stack, and if they're not already - // added we push them onto the check stack. - for check_idx in check_from..new_check_from { - // Take the current solution - let (component_index, solution_index) = check_stack[check_idx]; - debug_assert!(!self.local[component_index].solutions.is_empty()); - let cur_solution = &self.local[component_index].solutions[solution_index]; - - // Go through the matches and check if they're on the stack or - // should be added to the stack. - for cur_match in &cur_solution.matches { - let mut is_already_on_stack = false; - let mut has_same_solution = false; - for existing_check_idx in 0..check_from { - let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx]; - if existing_component_index == cur_match.target_index { - // Already lives on the stack, so the match MUST - // contain the same solution index if the checked - // local solution is agreeable with the (partially - // determined) global solution. - is_already_on_stack = true; - if cur_match.match_indices.contains(&existing_solution_index) { - has_same_solution = true; - break; - } + // Construct initial entry on stack + let mut stack = Vec::with_capacity(self.local.len()); + stack.push(CheckEntry{ + component_index: initial_component_index, + solution_index: initial_solution_index, + parent_entry_index: 0, + match_index_in_parent: 0, + solution_index_in_parent: 0, + }); + + 'check_last_stack: loop { + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + // Check if the current component is matching with all other entries + let mut all_match = true; + 'check_against_existing: for prev_index in 0..cur_index { + let prev_entry = &stack[prev_index]; + let prev_component = &self.local[prev_entry.component_index]; + let prev_solution = &prev_component.solutions[prev_entry.solution_index]; + + for prev_matching_component in &prev_solution.matches { + if prev_matching_component.target_index == cur_entry.component_index { + // Previous entry has shared ports with the current + // entry, so see if we have a composable pair of + // solutions. + if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) { + all_match = false; + break 'check_against_existing; } } + } + } - if is_already_on_stack { - if !has_same_solution { - // We have an inconsistency, so we need to go back - // in our stack, and try the next solution - let (last_component_index, last_solution_index) = check_stack[check_from]; - check_stack.truncate(check_from); - if check_stack.is_empty() { - // The starting point does not yield a valid - // solution - return None; - } + if all_match { + // All components matched until now. + if stack.len() == self.local.len() { + // We have found a global solution + break 'check_last_stack; + } - // Try the next one - let last_component = &self.local[last_component_index]; - let new_solution_index = last_solution_index + 1; - if new_solution_index >= last_component.solutions.len() { - // No more things to try, again: no valid - // solution - return None; - } + // Not all components found yet, look for a new one that has not + // yet been added yet. + for (parent_index, parent_entry) in stack.iter().enumerate() { + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; - check_stack.push((last_component_index, new_solution_index)); - continue 'checking_loop; - } // else: we're fine, the solution is agreeable - } else { - check_stack.push((cur_match.target_index, 0)) + for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() { + if peer_component.match_indices.is_empty() { + continue; + } + + let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index); + if !already_added { + // New component to try + stack.push(CheckEntry{ + component_index: peer_component.target_index, + solution_index: peer_component.match_indices[0], + parent_entry_index: parent_index, + match_index_in_parent: peer_index, + solution_index_in_parent: 0, + }); + continue 'check_last_stack; + } } } + + // Cannot find a peer to add. This is possible if, for example, + // we have a component A which has the only connection to + // component B. And B has sent a local solution saying it is + // finished, but the last data message has not yet arrived at A. + + // In any case, we just exit the if statement and handle not + // being able to find a new connector as being forced to try a + // new permutation of possible local solutions. + } + + // Either the currently considered local solution is inconsistent + // with other local solutions, or we cannot find a new component to + // add. This is where we perform backtracking as long as needed to + // try a new solution. + while stack.len() > 1 { + // Check if our parent has another solution we can try + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + let parent_entry = &stack[cur_entry.parent_entry_index]; + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; + + let match_component = &parent_solution.matches[cur_entry.match_index_in_parent]; + debug_assert!(match_component.target_index == cur_entry.component_index); + let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1; + + if new_solution_index_in_parent < match_component.match_indices.len() { + // We can still try a new one + let new_solution_index = match_component.match_indices[new_solution_index_in_parent]; + let cur_entry = &mut stack[cur_index]; + cur_entry.solution_index_in_parent = new_solution_index_in_parent; + cur_entry.solution_index = new_solution_index; + continue 'check_last_stack; + } else { + // We're out of options here. So pop an entry, then in + // the next iteration of this backtracking loop we try + // to increment that solution + stack.pop(); + } } - check_from = new_check_from; + // Stack length is 1, hence we're back at our initial solution. + // Since that doesn't yield a global solution, we simply: + return None; } - // Because of our earlier checking if we can have a solution at - // all (all components have their peers), and the exit condition of the - // while loop: if we're here, then we have a global solution - debug_assert_eq!(check_stack.len(), self.local.len()); - let mut final_branches = Vec::with_capacity(check_stack.len()); - for (component_index, solution_index) in check_stack.iter().copied() { - let component = &self.local[component_index]; - let solution = &component.solutions[solution_index]; + // Constructing the representation of the global solution + debug_assert_eq!(stack.len(), self.local.len()); + let mut final_branches = Vec::with_capacity(stack.len()); + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; final_branches.push((component.component, solution.final_branch_id)); } // Just debugging here, TODO: @remove let mut total_num_channels = 0; - for (component_index, _) in check_stack.iter().copied() { - let component = &self.local[component_index]; + for entry in &stack { + let component = &self.local[entry.component_index]; total_num_channels += component.solutions[0].channel_mapping.len(); } @@ -857,9 +964,9 @@ impl SolutionCombiner { let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; - for (component_index, solution_index) in check_stack.iter().copied() { - let component = &self.local[component_index]; - let solution = &component.solutions[solution_index]; + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 808fe06a90cb43fb4205329a7104e634a10e41b5..f1175db78a17bb428cdee71c3bf0cc6c8d43210a 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -22,6 +22,7 @@ pub(crate) struct PortAnnotation { pub(crate) struct SyncHeader { pub sending_component_id: ConnectorId, pub highest_component_id: ConnectorId, + pub sync_round: u32, } /// The header added to data messages diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 2298f124edc741798ada60832c0f541ee13f3594..a5c2aac79bf56dedc089cee2118064d8f49f87c7 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -3,9 +3,9 @@ use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; -const NUM_THREADS: u32 = 1; // number of threads in runtime -const NUM_INSTANCES: u32 = 1; // number of test instances constructed -const NUM_LOOPS: u32 = 1500; // number of loops within a single test (not used by all tests) +const NUM_THREADS: u32 = 3; // number of threads in runtime +const NUM_INSTANCES: u32 = 5; // number of test instances constructed +const NUM_LOOPS: u32 = 5; // number of loops within a single test (not used by all tests) fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl"); @@ -108,7 +108,7 @@ fn test_star_shaped_request() { auto num_edges = length(requests); while (loop_index < loops) { - print(\"starting loop\"); + // print(\"starting loop\"); synchronous { u32 edge_index = 0; u32 sum = 0; @@ -121,7 +121,7 @@ fn test_star_shaped_request() { assert(sum == num_edges * (num_edges - 1)); } - print(\"ending loop\"); + // print(\"ending loop\"); loop_index += 1; } } @@ -152,4 +152,81 @@ fn test_star_shaped_request() { Value::UInt32(NUM_LOOPS), ])); }); +} + +#[test] +fn test_conga_line_request() { + const CODE: &'static str = " + primitive start(out req, in resp, u32 num_nodes, u32 num_loops) { + u32 loop_index = 0; + u32 initial_value = 1337; + while (loop_index < num_loops) { + synchronous { + put(req, initial_value); + auto result = get(resp); + assert(result == initial_value + num_nodes * 2); + } + loop_index += 1; + } + } + + primitive middle( + in req_in, out req_forward, + in resp_in, out resp_forward, + u32 num_loops + ) { + u32 loop_index = 0; + while (loop_index < num_loops) { + synchronous { + auto req = get(req_in); + put(req_forward, req + 1); + auto resp = get(resp_in); + put(resp_forward, resp + 1); + } + loop_index += 1; + } + } + + primitive end(in req_in, out resp_out, u32 num_loops) { + u32 loop_index = 0; + while (loop_index < num_loops) { + synchronous { + auto req = get(req_in); + put(resp_out, req); + } + loop_index += 1; + } + } + + composite constructor(u32 num_nodes, u32 num_loops) { + channel initial_req -> req_in; + channel resp_out -> final_resp; + new start(initial_req, final_resp, num_nodes, num_loops); + + in last_req_in = req_in; + out last_resp_out = resp_out; + + u32 node = 0; + while (node < num_nodes) { + channel new_req_fw -> new_req_in; + channel new_resp_out -> new_resp_in; + new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops); + + last_req_in = new_req_in; + last_resp_out = new_resp_out; + + node += 1; + } + + new end(last_req_in, last_resp_out, num_loops); + } + "; + + let thing = TestTimer::new("conga_line_request"); + run_test_in_runtime(CODE, |api| { + api.create_connector("", "constructor", ValueGroup::new_stack(vec![ + Value::UInt32(5), + Value::UInt32(NUM_LOOPS) + ])); + }); } \ No newline at end of file