Changeset - 6e3f85de2a0a
[Not reviewed]
0 1 0
MH - 4 years ago 2021-11-08 20:45:47
contact@maxhenger.nl
initial version of new consensus
1 file changed with 100 insertions and 54 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use std::str::pattern::Pattern;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{ChannelId, Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct GlobalSolution {
 
    branches: Vec<(ConnectorId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
pub(crate) struct Consensus {
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
@@ -183,53 +185,56 @@ impl Consensus {
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                target_mapping.push((
 
                    port_desc.channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 
            self.send_or_store_local_solution(local_solution, ctx);
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        final_ports.clear();
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for port in &branch.port_mapping {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are begin sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 
@@ -276,48 +281,96 @@ impl Consensus {
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
                return None;
 
            },
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                self.send_or_store_local_solution(solution, ctx);
 
                return None;
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 

	
 
        self.encountered_peers.push(sync_header.sending_component_id);
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
@@ -328,100 +381,60 @@ impl Consensus {
 
                }
 

	
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: encountered_id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_solutions(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
                }
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 

	
 
                for (connector_id, _) in global_solution.branches.iter().copied() {
 
                    let message = SyncMessageFancy{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: connector_id,
 
                        content: SyncContent::GlobalSolution(global_solution.clone()),
 
                    };
 
                    ctx.submit_message(MessageFancy::Sync(message));
 
                }
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        for local_solution in self.solution_combiner.drain() {
 
@@ -458,49 +471,49 @@ struct ComponentPeer {
 
}
 

	
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<Vec<(ConnectorId, BranchId)>> {
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
            Some((component_index, storage)) => {
 
                // Entry for component exists, so add to solutions
 
                let solution_index = storage.solutions.len();
 
                storage.solutions.push(solution);
 

	
 
                (component_index, solution_index, false)
 
            }
 
            None => {
 
                // Entry for component does not exist yet
 
                let component_index = self.local.len();
 
                self.local.push(ComponentLocalSolutions{
 
                    component: component_id,
 
@@ -655,49 +668,49 @@ impl SolutionCombiner {
 

	
 
            let cur_component = &mut self.local[component_index];
 
            let cur_solution = &mut cur_component.solutions[solution_index];
 

	
 
            match cur_solution.matches.iter_mut()
 
                .find(|v| v.target_id == new_component_match.target_id)
 
            {
 
                Some(other_match) => {
 
                    // Already have an entry
 
                    debug_assert_eq!(other_match.target_index, new_component_match.target_index);
 
                    other_match.match_indices.extend(&new_component_match.match_indices);
 
                },
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<GlobalSolution> {
 
        if !self.can_have_solution() {
 
            return None;
 
        }
 

	
 
        // By now we're certain that all peers are present. So once our
 
        // backtracking solution stack is as long as the number of components,
 
        // then we have found a global solution.
 
        let mut check_stack = Vec::new();
 
        let mut check_from = 0;
 
        check_stack.push((component_index, solution_index));
 
        'checking_loop: while check_from < check_stack.len() {
 
            // Prepare for next iteration
 
            let new_check_from = check_stack.len();
 

	
 
            // Go through all entries on the checking stack. Each entry
 
            // corresponds to a component's solution. We check that one against
 
            // previously added ones on the stack, and if they're not already
 
            // added we push them onto the check stack.
 
            for check_idx in check_from..new_check_from {
 
                // Take the current solution
 
                let (component_index, solution_index) = check_stack[check_idx];
 
                debug_assert!(!self.local[component_index].solutions.is_empty());
 
                let cur_solution = &self.local[component_index].solutions[solution_index];
 

	
 
@@ -734,59 +747,92 @@ impl SolutionCombiner {
 
                            }
 

	
 
                            // Try the next one
 
                            let last_component = &self.local[last_component_index];
 
                            let new_solution_index = last_solution_index + 1;
 
                            if new_solution_index >= last_component.solutions.len() {
 
                                // No more things to try, again: no valid
 
                                // solution
 
                                return None;
 
                            }
 

	
 
                            check_stack.push((last_component_index, new_solution_index));
 
                            continue 'checking_loop;
 
                        } // else: we're fine, the solution is agreeable
 
                    } else {
 
                        check_stack.push((cur_match.target_index, 0))
 
                    }
 
                }
 
            }
 

	
 
            check_from = new_check_from;
 
        }
 

	
 
        // Because of our earlier checking if we can have a solution at
 
        // all (all components have a peer), and the exit condition of the while
 
        // loop: if we're here, then we have a global solution
 
        // all (all components have their peers), and the exit condition of the
 
        // while loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut global_solution = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack {
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
            global_solution.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        return Some(global_solution);
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_ports = 0;
 
        for (component_index, _) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            total_num_ports += component.solutions[0].port_mapping.len();
 
        }
 

	
 
        total_num_ports /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_ports);
 
        let mut total_num_checked = 0;
 

	
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 

	
 
            for (channel_id, branch_id) in solution.port_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
 
                        final_mapping.push((channel_id, branch_id));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        debug_assert_eq!(total_num_checked, total_num_ports);
 

	
 
        return Some(GlobalSolution{
 
            branches: global_solution,
 
            port_mapping: final_mapping,
 
        });
 
    }
 

	
 
    /// Simple test if a solution is at all possible. If this returns true it
 
    /// does not mean there actually is a solution.
 
    fn can_have_solution(&self) -> bool {
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return false;
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution back into local
 
    /// solutions to ship to another component.
 
    // TODO: Don't do this, kind of wasteful since a lot of processing has
 
    //  already been performed.
 
    fn drain(&mut self) -> Vec<LocalSolution> {
 
        let mut reserve_len = 0;
 
        for component in &self.local {
 
            reserve_len += component.solutions.len();
 
        }
 

	
0 comments (0 inline, 0 general)