diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index b65ea87882def537dd7a4ad334e7c65a9a87f79c..1affea56d580a3dcf628f13a2f71142a82e4525c 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -14,13 +14,14 @@ struct BranchAnnotation { port_mapping: Vec, } +#[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, port_mapping: Vec<(ChannelId, BranchId)>, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info @@ -53,7 +54,7 @@ pub(crate) struct Consensus { workspace_ports: Vec, } -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Consistency { Valid, Inconsistent, @@ -88,7 +89,7 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) { + pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); @@ -98,7 +99,7 @@ impl Consensus { // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ - port_mapping: ports.iter() + port_mapping: ctx.get_ports().iter() .map(|v| PortAnnotation{ port_id: v.self_id, registered_id: None, @@ -237,6 +238,8 @@ impl Consensus { /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); + let sync_header = self.create_sync_header(ctx); + let branch = &mut self.branch_annotations[branch_id.index as usize]; if cfg!(debug_assertions) { @@ -246,7 +249,7 @@ impl Consensus { debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } - // Check for ports that are begin sent + // Check for ports that are being sent debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { @@ -257,7 +260,6 @@ impl Consensus { // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); - let sync_header = self.create_sync_header(ctx); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), @@ -309,7 +311,7 @@ impl Consensus { SyncContent::GlobalSolution(solution) => { // Take branch of interest and return it. let (_, branch_id) = solution.component_branches.iter() - .find(|(connector_id, _)| connector_id == ctx.id) + .find(|(connector_id, _)| *connector_id == ctx.id) .unwrap(); return Some(*branch_id); } @@ -389,14 +391,14 @@ impl Consensus { // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; for encountered_id in self.encountered_peers.iter() { - if encountered_id == sync_header.sending_component_id { + if *encountered_id == sync_header.sending_component_id { // Don't need to send it to this one continue } let message = SyncMessageFancy{ sync_header: self.create_sync_header(ctx), - target_component_id: encountered_id, + target_component_id: *encountered_id, content: SyncContent::Notification, }; ctx.submit_message(MessageFancy::Sync(message)); @@ -587,7 +589,7 @@ impl SolutionCombiner { } let mut num_ports_in_peers = 0; - for peer in component_peers { + for peer in &component_peers { num_ports_in_peers += peer.involved_channels.len(); } @@ -823,7 +825,7 @@ impl SolutionCombiner { for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { - debug_assert_eq!(encountered_branch_id, branch_id); + debug_assert_eq!(*encountered_branch_id, branch_id); total_num_checked += 1; }, None => {