Changeset - 0bc3b84bc033
[Not reviewed]
0 1 0
MH - 4 years ago 2021-11-08 10:18:53
contact@maxhenger.nl
WIP on new consensus approach
1 file changed with 78 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
@@ -10,27 +10,27 @@ use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
}
 

	
 
pub(crate) struct GlobalSolution {
 

	
 
}
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
pub(crate) struct Consensus {
 
@@ -265,25 +265,25 @@ impl Consensus {
 
                    continue
 
                }
 

	
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: encountered_id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_solutions();
 
            self.forward_local_solutions(ctx);
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
@@ -391,24 +391,98 @@ impl Consensus {
 
            for local_solution in self.local_solutions.drain() {
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: self.highest_connector_id,
 
                    content: SyncContent::LocalSolution(local_solution),
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>,
 
    involved_ports: Vec<PortIdLocal>,
 
}
 

	
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    solutions: Vec<MatchedLocalSolution>,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
pub(crate) struct GlobalSolution {
 
    local: Vec<ComponentLocalSolutions>
 
}
 

	
 
impl GlobalSolution {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 

	
 
    fn add_solution(&mut self, solution: LocalSolution) {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index) = match component_exists {
 
            Some((component_index, storage)) => {
 
                // Entry for component exists, so add to solutions
 
                let solution_index = storage.solutions.len();
 
                storage.solutions.push(solution);
 

	
 
                (component_index, solution_index)
 
            }
 
            None => {
 
                // Entry for component does not exist yet
 
                let component_index = self.local.len();
 
                self.local.push(ComponentLocalSolutions{
 
                    component: component_id,
 
                    solutions: vec![solution],
 
                });
 
                (component_index, 0)
 
            }
 
        };
 

	
 
        // Compare this new solution to other solutions of different components
 
        // to see if we get a closed global solution.
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic Helpers
 
// -----------------------------------------------------------------------------
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
0 comments (0 inline, 0 general)