diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index f69f1392977a41b4413618fefe4e1b13441230ea..91764ee546a1731534007775d44156f10b8958f7 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,10 +1,9 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::BranchQueueIter; -use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; +use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ Message, PortAnnotation, @@ -49,13 +48,13 @@ struct Peer { // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups // TODO: A lot of stuff should be batched. Like checking all the sync headers -// and sending "I have a higher ID" messages. +// and sending "I have a higher ID" messages. Should reduce locking by quite a +// bit. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, - last_finished_handled: Option, // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, @@ -77,7 +76,6 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), - last_finished_handled: None, encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), peers: Vec::new(), @@ -106,7 +104,6 @@ impl Consensus { pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); - debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, @@ -188,68 +185,59 @@ impl Consensus { unreachable!("notify_of_speculative_mapping called with unowned port"); } - /// Generates sync messages for any branches that are at the end of the - /// sync block. To find these branches, they should've been put in the - /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { - debug_assert!(self.is_in_sync()); - - let mut last_branch_id = self.last_finished_handled; - for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { - // Turn the port mapping into a local solution - let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; - let mut target_mapping = Vec::with_capacity(source_mapping.len()); - - for port in source_mapping { - // Note: if the port is silent, and we've never communicated - // over the port, then we need to do so now, to let the peer - // component know about our sync leader state. - let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); - let peer_port_id = port_desc.peer_id; - let channel_id = port_desc.channel_id; - - if !self.encountered_ports.contains(&port.port_id) { - ctx.submit_message(Message::Data(DataMessage { - sync_header: SyncHeader{ - sending_component_id: ctx.id, - highest_component_id: self.highest_connector_id, - sync_round: self.sync_round - }, - data_header: DataHeader{ - expected_mapping: source_mapping.clone(), - sending_port: port.port_id, - target_port: peer_port_id, - new_mapping: BranchId::new_invalid(), - }, - content: DataContent::SilentPortNotification, - })); - self.encountered_ports.push(port.port_id); - } - - target_mapping.push(( - channel_id, - port.registered_id.unwrap_or(BranchId::new_invalid()) - )); - } - - let local_solution = LocalSolution{ - component: ctx.id, - final_branch_id: branch.id, - port_mapping: target_mapping, - }; - let solution_branch = self.send_or_store_local_solution(local_solution, ctx); - if solution_branch.is_some() { - // No need to continue iterating, we've found the solution - return solution_branch; + /// Generates a new local solution from a finished branch. If the component + /// is not the leader of the sync region then it will be sent to the + /// appropriate component. If it is the leader then there is a chance that + /// this solution completes a global solution. In that case the solution + /// branch ID will be returned. + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. + let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&port.port_id) { + ctx.submit_message(Message::Data(DataMessage { + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + sync_round: self.sync_round + }, + data_header: DataHeader{ + expected_mapping: source_mapping.clone(), + sending_port: port.port_id, + target_port: peer_port_id, + new_mapping: BranchId::new_invalid(), + }, + content: DataContent::SilentPortNotification, + })); + self.encountered_ports.push(port.port_id); } - last_branch_id = Some(branch.id); + target_mapping.push(( + channel_id, + port.registered_id.unwrap_or(BranchId::new_invalid()) + )); } - self.last_finished_handled = last_branch_id; - return None; + let local_solution = LocalSolution{ + component: ctx.id, + final_branch_id: branch_id, + port_mapping: target_mapping, + }; + let solution_branch = self.send_or_store_local_solution(local_solution, ctx); + return solution_branch; } + /// Notifies the consensus algorithm about the chosen branch to commit to + /// memory. pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); @@ -264,7 +252,6 @@ impl Consensus { // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); - self.last_finished_handled = None; self.encountered_ports.clear(); self.solution_combiner.clear(); @@ -325,15 +312,10 @@ impl Consensus { return (self.create_sync_header(ctx), data_header); } - /// Handles a new data message by handling the data and sync header, and - /// checking which *existing* branches *can* receive the message. So two - /// cautionary notes: - /// 1. A future branch might also be able to receive this message, see the - /// `branch_can_receive` function. - /// 2. We return the branches that *can* receive the message, you still - /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, potential_receivers: BranchQueueIter<'_, >, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { - self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); + /// Handles a new data message by handling the sync header. The caller is + /// responsible for checking for branches that might be able to receive + /// the message. + pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { return self.handle_received_sync_header(&message.sync_header, ctx) } @@ -367,18 +349,18 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) { - debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) { + debug_assert!(self.branch_can_receive(branch_id, message)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { - if mapping.port_id == data_header.target_port { + if mapping.port_id == message.data_header.target_port { // Found the port in which the message should be inserted - mapping.registered_id = Some(data_header.new_mapping); + mapping.registered_id = Some(message.data_header.new_mapping); // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); + find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -395,20 +377,20 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool { - if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) { - if sync_header.sync_round < peer.expected_sync_round { + pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) { + if message.sync_header.sync_round < peer.expected_sync_round { return false; } } - if let DataContent::SilentPortNotification = content { + if let DataContent::SilentPortNotification = message.content { // No port can receive a "silent" notification. return false; } let annotation = &self.branch_annotations[branch_id.index as usize]; - for expected in &data_header.expected_mapping { + for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches for current in &annotation.port_mapping { @@ -427,21 +409,6 @@ impl Consensus { // --- Internal helpers - /// Checks data header and consults the stored port mapping and the - /// execution tree to see which branches may receive the data message's - /// contents. - fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { - if branch.awaiting_port == data_header.target_port { - // Found a branch awaiting the message, but we need to make sure - // the mapping is correct - if self.branch_can_receive(branch.id, sync_header, data_header, content) { - target_ids.push(branch.id); - } - } - } - } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves if !self.handle_peer(sync_header) {