diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 110d934e910921dd947b8b2fd8a6e042dc285ace..69ca8e689d202ab1aae53a6991cc15503c604bc7 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,4 +1,4 @@ - +use std::path::Component; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; @@ -20,8 +20,13 @@ struct BranchAnnotation { // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups pub(crate) struct Consensus { + // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, + last_finished_handled: Option, + // Gathered state (in case we are currently the leader of the distributed + // consensus protocol) + // Workspaces workspace_ports: Vec, } @@ -36,6 +41,8 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), + last_finished_handled: None, + workspace_ports: Vec::new(), } } @@ -46,6 +53,13 @@ impl Consensus { return !self.branch_annotations.is_empty(); } + /// TODO: Remove this once multi-fire is in place + pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation { + let branch = &self.branch_annotations[branch_id.index as usize]; + let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap(); + return port; + } + /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. @@ -81,8 +95,9 @@ impl Consensus { /// Notifies the consensus algorithm that a branch has reached the end of /// the sync block. A final check for consistency will be performed that the - /// caller has to handle + /// caller has to handle. Note that pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { + debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; for mapping in &branch.port_mapping { match mapping.expected_firing { @@ -103,6 +118,7 @@ impl Consensus { /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == port_id { @@ -127,8 +143,31 @@ impl Consensus { unreachable!("notify_of_speculative_mapping called with unowned port"); } + /// Generates sync messages for any branches that are at the end of the + /// sync block. To find these branches, they should've been put in the + /// "finished" queue in the execution tree. + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) { + debug_assert!(self.is_in_sync()); + + let mut last_branch_id = self.last_finished_handled; + for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { + // Turn the port mapping into a local solution + + last_branch_id = Some(branch.id); + } + + self.last_finished_handled = last_branch_id; + } + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { - todo!("write"); + debug_assert!(self.is_in_sync()); + + // TODO: Handle sending and receiving ports + final_ports.clear(); + let branch = &self.branch_annotations[branch_id.index as usize]; + for port in &branch.port_mapping { + final_ports.push(port.port_id); + } } // --- Handling messages @@ -191,7 +230,7 @@ impl Consensus { /// This function is generally called for freshly received messages that /// should be matched against previously halted branches. pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) { + for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct