Changeset - 6e3f85de2a0a
[Not reviewed]
0 1 0
MH - 4 years ago 2021-11-08 20:45:47
contact@maxhenger.nl
initial version of new consensus
1 file changed with 100 insertions and 54 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use std::str::pattern::Pattern;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{ChannelId, Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct GlobalSolution {
 
    branches: Vec<(ConnectorId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
@@ -195,29 +197,32 @@ impl Consensus {
 
            self.send_or_store_local_solution(local_solution, ctx);
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        final_ports.clear();
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for port in &branch.port_mapping {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            let port = branch.port_mapping.iter()
 
@@ -288,24 +293,72 @@ impl Consensus {
 
                return None;
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
@@ -340,76 +393,36 @@ impl Consensus {
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 

	
 
                for (connector_id, _) in global_solution.branches.iter().copied() {
 
                    let message = SyncMessageFancy{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: connector_id,
 
                        content: SyncContent::GlobalSolution(global_solution.clone()),
 
                    };
 
                    ctx.submit_message(MessageFancy::Sync(message));
 
                }
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 

	
 
@@ -470,25 +483,25 @@ pub(crate) struct SolutionCombiner {
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<Vec<(ConnectorId, BranchId)>> {
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
@@ -667,25 +680,25 @@ impl SolutionCombiner {
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<GlobalSolution> {
 
        if !self.can_have_solution() {
 
            return None;
 
        }
 

	
 
        // By now we're certain that all peers are present. So once our
 
        // backtracking solution stack is as long as the number of components,
 
        // then we have found a global solution.
 
        let mut check_stack = Vec::new();
 
        let mut check_from = 0;
 
        check_stack.push((component_index, solution_index));
 
        'checking_loop: while check_from < check_stack.len() {
 
            // Prepare for next iteration
 
@@ -746,35 +759,68 @@ impl SolutionCombiner {
 
                            continue 'checking_loop;
 
                        } // else: we're fine, the solution is agreeable
 
                    } else {
 
                        check_stack.push((cur_match.target_index, 0))
 
                    }
 
                }
 
            }
 

	
 
            check_from = new_check_from;
 
        }
 

	
 
        // Because of our earlier checking if we can have a solution at
 
        // all (all components have a peer), and the exit condition of the while
 
        // loop: if we're here, then we have a global solution
 
        // all (all components have their peers), and the exit condition of the
 
        // while loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut global_solution = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack {
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
            global_solution.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        return Some(global_solution);
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_ports = 0;
 
        for (component_index, _) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            total_num_ports += component.solutions[0].port_mapping.len();
 
        }
 

	
 
        total_num_ports /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_ports);
 
        let mut total_num_checked = 0;
 

	
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 

	
 
            for (channel_id, branch_id) in solution.port_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
 
                        final_mapping.push((channel_id, branch_id));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        debug_assert_eq!(total_num_checked, total_num_ports);
 

	
 
        return Some(GlobalSolution{
 
            branches: global_solution,
 
            port_mapping: final_mapping,
 
        });
 
    }
 

	
 
    /// Simple test if a solution is at all possible. If this returns true it
 
    /// does not mean there actually is a solution.
 
    fn can_have_solution(&self) -> bool {
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return false;
 
            }
 
        }
 

	
 
        return true;
0 comments (0 inline, 0 general)