Changeset - 166fbf798871
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-25 18:52:09
contact@maxhenger.nl
WIP: Updating consensus algorithm
2 files changed with 29 insertions and 173 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -79,30 +79,37 @@ pub struct SyncMessage {
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncLocalSolutionEntry {
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub port_kind: PortKind,
 
pub enum SyncLocalSolutionEntry {
 
    Putter(SyncSolutionPutterPort),
 
    Getter(SyncSolutionGetterPort),
 
}
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
/// Getter port in a solution. Upon receiving a message it is certain about who
 
/// its peer is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPort {
 
pub struct SyncSolutionGetterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub port_kind: PortKind,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
/// component/port is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPort>,
 
    pub getter: Option<SyncSolutionPort>,
 
    pub putter: Option<SyncSolutionPutterPort>,
 
    pub getter: Option<SyncSolutionGetterPort>,
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
@@ -114,16 +121,16 @@ pub enum SyncRoundDecision {
 

	
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub submissions_by: Vec<(CompId, bool)>,
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub matched_channels: usize,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            submissions_by: Vec::new(),
 
            channel_mapping: Vec::new(),
 
            matched_channels: 0,
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -38,7 +38,7 @@ impl SolutionCombiner {
 

	
 
    #[inline]
 
    fn has_contributions(&self) -> bool {
 
        return !self.solution.submissions_by.is_empty();
 
        return !self.solution.channel_mapping.is_empty();
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
@@ -53,11 +53,6 @@ impl SolutionCombiner {
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
        // Combine the submission tracking
 
        for (comp_id, present) in partial.submissions_by {
 
            self.mark_single_component_submission(comp_id, present);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
@@ -144,60 +139,16 @@ impl SolutionCombiner {
 
    /// provided local solution. Make sure to check the `has_decision` return
 
    /// value afterwards.
 
    fn combine_with_local_solution(&mut self, comp_id: CompId, solution: SyncLocalSolution) {
 
        // Mark the contributions of the component and detect components whose
 
        // submissions we do not yet have
 
        self.mark_single_component_submission(comp_id, true);
 
        for entry in solution.iter() {
 
            self.mark_single_component_submission(entry.peer_comp_id, false);
 
        }
 

	
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 

	
 
        // Go through all entries and check if the submitted local solution is
 
        // consistent with our partial solution
 
        let mut had_new_entry = false;
 
        for entry in solution.iter() {
 
            let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry);
 
            let new_port = SyncSolutionPort{
 
                self_comp_id: comp_id,
 
                self_port_id: entry.self_port_id,
 
                peer_comp_id: entry.peer_comp_id,
 
                peer_port_id: entry.peer_port_id,
 
                mapping: entry.mapping,
 
                port_kind: entry.port_kind,
 
            };
 
        // Combine partial solution with the local solution entries
 
        for entry in solution {
 
            match entry {
 
                SyncLocalSolutionEntry::Putter(putter) => {
 

	
 
            match preexisting_index {
 
                Some(entry_index) => {
 
                    // Add the local solution's entry to the existing entry in
 
                    // the global solution. We'll handle any mismatches along
 
                    // the way.
 
                    let channel = &mut self.solution.channel_mapping[entry_index];
 
                    match entry.port_kind {
 
                        PortKind::Putter => {
 
                            // Getter should be present in existing entry
 
                            debug_assert!(channel.getter.is_some() && channel.putter.is_none());
 
                            channel.putter = Some(new_port);
 
                        },
 
                        PortKind::Getter => {
 
                            // Putter should be present in existing entry
 
                            debug_assert!(channel.putter.is_some() && channel.getter.is_none());
 
                            channel.getter = Some(new_port);
 
                        }
 
                    }
 

	
 
                    if !Self::channel_is_consistent(channel) {
 
                        self.solution.decision = SyncRoundDecision::Failure;
 
                    }
 
                },
 
                None => {
 
                    // No entry yet. So add it
 
                    let new_solution = match entry.port_kind {
 
                        PortKind::Putter => SyncSolutionChannel{ putter: Some(new_port), getter: None },
 
                        PortKind::Getter => SyncSolutionChannel{ putter: None, getter: Some(new_port) },
 
                    };
 
                    self.solution.channel_mapping.push(new_solution);
 
                    had_new_entry = true;
 
                SyncLocalSolutionEntry::Getter(getter) => {
 

	
 
                }
 
            }
 
        }
 
@@ -237,18 +188,6 @@ impl SolutionCombiner {
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) {
 
        debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry
 
        for (entry, has_contributed) in self.solution.submissions_by.iter_mut() {
 
            if *entry == comp_id {
 
                *has_contributed = *has_contributed || will_contribute;
 
                return;
 
            }
 
        }
 

	
 
        self.solution.submissions_by.push((comp_id, will_contribute));
 
    }
 

	
 
    fn update_all_present(&mut self) {
 
        debug_assert!(!self.all_present); // upheld by caller
 
        for (_, present) in self.solution.submissions_by.iter() {
 
@@ -260,101 +199,11 @@ impl SolutionCombiner {
 
        self.all_present = true;
 
    }
 

	
 
    /// Given the partial solution entry of a channel's port, check if there is
 
    /// an entry for the other port. If there is we return its index, and we
 
    /// return `None` otherwise.
 
    fn find_channel_index_for_partial_entry(&self, new_entry: &SyncSolutionPort) -> Option<usize> {
 
        fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_entry: &SyncSolutionPort) -> bool {
 
            (
 
                cur_entry.peer_comp_id == new_entry.self_comp_id &&
 
                cur_entry.peer_port_id == new_entry.self_port_id
 
            ) || (
 
                cur_entry.self_comp_id == new_entry.peer_comp_id &&
 
                cur_entry.self_port_id == new_entry.peer_port_id
 
            )
 
        }
 

	
 
        for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() {
 
            if new_entry.port_kind == PortKind::Putter {
 
                if let Some(cur_entry) = &cur_entry.getter {
 
                    if might_belong_to_same_channel(cur_entry, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            } else {
 
                if let Some(cur_entry) = &cur_entry.putter {
 
                    if might_belong_to_same_channel(cur_entry, new_entry) {
 
                        return Some(entry_index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Given the local solution entry for one end of a channel, check if there
 
    /// is an entry for the other end of the channel such that they can be
 
    /// paired up.
 
    fn find_channel_index_for_local_entry(&self, comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> Option<usize> {
 
        fn might_belong_to_same_channel(cur_entry: &SyncSolutionPort, new_comp_id: CompId, new_entry: &SyncLocalSolutionEntry) -> bool {
 
            (
 
                new_entry.peer_comp_id == cur_entry.self_comp_id &&
 
                new_entry.peer_port_id == cur_entry.self_port_id
 
            ) || (
 
                new_comp_id == cur_entry.peer_comp_id &&
 
                new_entry.self_port_id == cur_entry.peer_port_id
 
            )
 
        }
 
    /// Retrieve channel index based on a putter port
 
    fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option<usize> {
 
        for entry in self.solution.channel_mapping.iter() {
 

	
 
        for (entry_index, cur_entry) in self.solution.channel_mapping.iter().enumerate() {
 
            // Note that the check that determines whether two ports belong to
 
            // the same channel is one-sided. That is: port A may decide that
 
            // port B is part of its channel, but port B may consider port A not
 
            // to be part of its channel. Before merging the entries (outside of
 
            // this function) we'll make sure this is not the case.
 
            match new_entry.port_kind {
 
                PortKind::Putter => {
 
                    // Expect getter to be present
 
                    if let Some(cur_entry) = &cur_entry.getter {
 
                        if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                            return Some(entry_index);
 
                        }
 
                    }
 
                },
 
                PortKind::Getter => {
 
                    if let Some(cur_entry) = &cur_entry.putter {
 
                        if might_belong_to_same_channel(cur_entry, comp_id, new_entry) {
 
                            return Some(entry_index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    // Makes sure that two ports agree that they are each other's peers
 
    fn ports_belong_to_same_channel(a: &SyncSolutionPort, b: &SyncSolutionPort) -> bool {
 
        return
 
            a.self_comp_id == b.peer_comp_id && a.self_port_id == b.peer_port_id &&
 
            a.peer_comp_id == b.self_comp_id && a.peer_port_id == b.self_port_id
 
    }
 

	
 
    // Makes sure channel is consistently mapped (or not yet fully specified)
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> bool {
 
        debug_assert!(channel.putter.is_some() || channel.getter.is_some());
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            // Not yet fully specified
 
            return false;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return
 
            Self::ports_belong_to_same_channel(putter, getter) &&
 
                putter.mapping == getter.mapping;
 
    }
 
}
 

	
0 comments (0 inline, 0 general)