diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 7f84183fdc9885c463fad673a24142500abeceb7..f879550f7307f7dc5a0b96d908adab1299f2175e 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -79,30 +79,37 @@ pub struct SyncMessage { } #[derive(Debug)] -pub struct SyncLocalSolutionEntry { - pub self_port_id: PortId, - pub peer_comp_id: CompId, - pub peer_port_id: PortId, - pub mapping: u32, - pub port_kind: PortKind, +pub enum SyncLocalSolutionEntry { + Putter(SyncSolutionPutterPort), + Getter(SyncSolutionGetterPort), } pub type SyncLocalSolution = Vec; +/// Getter port in a solution. Upon receiving a message it is certain about who +/// its peer is. #[derive(Debug)] -pub struct SyncSolutionPort { +pub struct SyncSolutionGetterPort { pub self_comp_id: CompId, pub self_port_id: PortId, pub peer_comp_id: CompId, pub peer_port_id: PortId, pub mapping: u32, - pub port_kind: PortKind, +} + +/// Putter port in a solution. A putter may not be certain about who its peer +/// component/port is. +#[derive(Debug)] +pub struct SyncSolutionPutterPort { + pub self_comp_id: CompId, + pub self_port_id: PortId, + pub mapping: u32, } #[derive(Debug)] pub struct SyncSolutionChannel { - pub putter: Option, - pub getter: Option, + pub putter: Option, + pub getter: Option, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -114,16 +121,16 @@ pub enum SyncRoundDecision { #[derive(Debug)] pub struct SyncPartialSolution { - pub submissions_by: Vec<(CompId, bool)>, pub channel_mapping: Vec, + pub matched_channels: usize, pub decision: SyncRoundDecision, } impl Default for SyncPartialSolution { fn default() -> Self { return Self{ - submissions_by: Vec::new(), channel_mapping: Vec::new(), + matched_channels: 0, decision: SyncRoundDecision::None, } } diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index a0544a3d7f80b0db21a56e09d5c096e28ba89732..f8db817c114619e1361b8edca40ca69049cbd55c 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -38,7 +38,7 @@ impl SolutionCombiner { #[inline] fn has_contributions(&self) -> bool { - return !self.solution.submissions_by.is_empty(); + return !self.solution.channel_mapping.is_empty(); } /// Returns a decision for the current round. If there is no decision (yet) @@ -53,11 +53,6 @@ impl SolutionCombiner { } fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) { - // Combine the submission tracking - for (comp_id, present) in partial.submissions_by { - self.mark_single_component_submission(comp_id, present); - } - debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); debug_assert_ne!(partial.decision, SyncRoundDecision::Solution); @@ -144,60 +139,16 @@ impl SolutionCombiner { /// provided local solution. Make sure to check the `has_decision` return /// value afterwards. fn combine_with_local_solution(&mut self, comp_id: CompId, solution: SyncLocalSolution) { - // Mark the contributions of the component and detect components whose - // submissions we do not yet have - self.mark_single_component_submission(comp_id, true); - for entry in solution.iter() { - self.mark_single_component_submission(entry.peer_comp_id, false); - } - debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); - // Go through all entries and check if the submitted local solution is - // consistent with our partial solution - let mut had_new_entry = false; - for entry in solution.iter() { - let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry); - let new_port = SyncSolutionPort{ - self_comp_id: comp_id, - self_port_id: entry.self_port_id, - peer_comp_id: entry.peer_comp_id, - peer_port_id: entry.peer_port_id, - mapping: entry.mapping, - port_kind: entry.port_kind, - }; + // Combine partial solution with the local solution entries + for entry in solution { + match entry { + SyncLocalSolutionEntry::Putter(putter) => { - match preexisting_index { - Some(entry_index) => { - // Add the local solution's entry to the existing entry in - // the global solution. We'll handle any mismatches along - // the way. - let channel = &mut self.solution.channel_mapping[entry_index]; - match entry.port_kind { - PortKind::Putter => { - // Getter should be present in existing entry - debug_assert!(channel.getter.is_some() && channel.putter.is_none()); - channel.putter = Some(new_port); - }, - PortKind::Getter => { - // Putter should be present in existing entry - debug_assert!(channel.putter.is_some() && channel.getter.is_none()); - channel.getter = Some(new_port); - } - } - - if !Self::channel_is_consistent(channel) { - self.solution.decision = SyncRoundDecision::Failure; - } }, - None => { - // No entry yet. So add it - let new_solution = match entry.port_kind { - PortKind::Putter => SyncSolutionChannel{ putter: Some(new_port), getter: None }, - PortKind::Getter => SyncSolutionChannel{ putter: None, getter: Some(new_port) }, - }; - self.solution.channel_mapping.push(new_solution); - had_new_entry = true; + SyncLocalSolutionEntry::Getter(getter) => { + } } } @@ -237,18 +188,6 @@ impl SolutionCombiner { // --- Small utilities for combining solutions - fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) { - debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry - for (entry, has_contributed) in self.solution.submissions_by.iter_mut() { - if *entry == comp_id { - *has_contributed = *has_contributed || will_contribute; - return; - } - } - - self.solution.submissions_by.push((comp_id, will_contribute)); - } - fn update_all_present(&mut self) { debug_assert!(!self.all_present); // upheld by caller for (_, present) in self.solution.submissions_by.iter() { @@ -260,101 +199,11 @@ impl SolutionCombiner { self.all_present = true; } - /// Given the partial solution entry of a channel's port, check if there is - /// an entry for the other port. If there is we return its index, and we - /// return `None` otherwise. - fn find_channel_index_for_partial_entry(&self, new_entry: &SyncSolutionPort) -> Option { - fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_entry: &SyncSolutionPort) -> bool { - ( - cur_entry.peer_comp_id == new_entry.self_comp_id && - cur_entry.peer_port_id == new_entry.self_port_id - ) || ( - cur_entry.self_comp_id == new_entry.peer_comp_id && - cur_entry.self_port_id == new_entry.peer_port_id - ) - } - - for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() { - if new_entry.port_kind == PortKind::Putter { - if let Some(cur_entry) = &cur_entry.getter { - if might_belong_to_same_channel(cur_entry, new_entry) { - return Some(entry_index); - } - } - } else { - if let Some(cur_entry) = &cur_entry.putter { - if might_belong_to_same_channel(cur_entry, new_entry) { - return Some(entry_index); - } - } - } - } - - return None; - } - - /// Given the local solution entry for one end of a channel, check if there - /// is an entry for the other end of the channel such that they can be - /// paired up. - fn find_channel_index_for_local_entry(&self, comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> Option { - fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> bool { - ( - new_entry.peer_comp_id == cur_entry.self_comp_id && - new_entry.peer_port_id == cur_entry.self_port_id - ) || ( - new_comp_id == cur_entry.peer_comp_id && - new_entry.self_port_id == cur_entry.peer_port_id - ) - } + /// Retrieve channel index based on a putter port + fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option { + for entry in self.solution.channel_mapping.iter() { - for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() { - // Note that the check that determines whether two ports belong to - // the same channel is one-sided. That is: port A may decide that - // port B is part of its channel, but port B may consider port A not - // to be part of its channel. Before merging the entries (outside of - // this function) we'll make sure this is not the case. - match new_entry.port_kind { - PortKind::Putter => { - // Expect getter to be present - if let Some(cur_entry) = &cur_entry.getter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); - } - } - }, - PortKind::Getter => { - if let Some(cur_entry) = &cur_entry.putter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); - } - } - } - } - } - - return None; - } - - // Makes sure that two ports agree that they are each other's peers - fn ports_belong_to_same_channel(a: &SyncSolutionPort, b: &SyncSolutionPort) -> bool { - return - a.self_comp_id == b.peer_comp_id && a.self_port_id == b.peer_port_id && - a.peer_comp_id == b.self_comp_id && a.peer_port_id == b.self_port_id - } - - // Makes sure channel is consistently mapped (or not yet fully specified) - fn channel_is_consistent(channel: &SyncSolutionChannel) -> bool { - debug_assert!(channel.putter.is_some() || channel.getter.is_some()); - if channel.putter.is_none() || channel.getter.is_none() { - // Not yet fully specified - return false; } - - let putter = channel.putter.as_ref().unwrap(); - let getter = channel.getter.as_ref().unwrap(); - return - Self::ports_belong_to_same_channel(putter, getter) && - putter.mapping == getter.mapping; } }