Changeset - b1299290279a
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-29 18:22:16
contact@maxhenger.nl
put sync rounds in messages to leader, new failing test
4 files changed with 100 insertions and 20 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
@@ -14,43 +14,45 @@ use super::inbox::{
 
};
 
use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    sync_round_number: u32,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    component_branches: Vec<(ConnectorId, BranchId, u32)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
@@ -281,24 +283,25 @@ impl Consensus {
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            sync_round_number: self.sync_round,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
@@ -315,24 +318,26 @@ impl Consensus {
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 

	
 
        println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers)
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
@@ -419,26 +424,26 @@ impl Consensus {
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(component_id, _)| *component_id == ctx.id)
 
                let (_, branch_id, _) = solution.component_branches.iter()
 
                    .find(|(component_id, _, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                println!("DEBUGERINO: Got GlobalFailure, sending Ack in response");
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
@@ -730,37 +735,53 @@ impl Consensus {
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
        for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            // Send solution message
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 

	
 
            // Update peers as leader. Subsequent call to `end_sync` will update
 
            // the round numbers
 
            match self.peers.iter_mut().find(|v| v.id == connector_id) {
 
                Some(peer) => {
 
                    peer.expected_sync_round = sync_round;
 
                },
 
                None => {
 
                    self.peers.push(Peer{
 
                        id: connector_id,
 
                        expected_sync_round: sync_round,
 
                        encountered_this_round: true,
 
                    });
 
                }
 
            }
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            // Already sent out a failure
 
            return None;
 
@@ -771,25 +792,27 @@ impl Consensus {
 
        for presence in &self.solution_combiner.presence {
 
            if presence.owner_a != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.owner_a) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.owner_a,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            } else if let Some(owner_b) = presence.owner_b {
 
            }
 

	
 
            if let Some(owner_b) = presence.owner_b {
 
                if owner_b != ctx.id {
 
                    if encountered.push(owner_b) {
 
                        let message = SyncCompMessage{
 
                            sync_header: self.create_sync_header(ctx),
 
                            target_component_id: owner_b,
 
                            content: SyncCompContent::GlobalFailure,
 
                        };
 
                        ctx.submit_message(Message::SyncComp(message)).unwrap();
 
                    }
 
                }
 
            }
 
        }
 
@@ -898,24 +921,25 @@ struct ComponentMatches {
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    sync_round: u32,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentPresence {
 
    component_id: ConnectorId,
 
    channels: Vec<LocalChannelPresence>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
@@ -970,47 +994,49 @@ impl SolutionCombiner {
 
        return Self{
 
            local: Vec::new(),
 
            presence: Vec::new(),
 
            failure_reported: false,
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
 
        let component_id = solution.component;
 
        let sync_round = solution.sync_round_number;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            channel_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
            Some((component_index, storage)) => {
 
                // Entry for component exists, so add to solutions
 
                let solution_index = storage.solutions.len();
 
                storage.solutions.push(solution);
 

	
 
                (component_index, solution_index, false)
 
            }
 
            None => {
 
                // Entry for component does not exist yet
 
                let component_index = self.local.len();
 
                self.local.push(ComponentLocalSolutions{
 
                    component: component_id,
 
                    sync_round,
 
                    peers: Vec::new(),
 
                    solutions: vec![solution],
 
                    all_peers_present: false,
 
                });
 
                (component_index, 0, true)
 
            }
 
        };
 

	
 
        // If this is a solution of a component that is new to us, then we check
 
        // in the stored solutions which other components are peers of the new
 
        // one.
 
        if new_component {
 
@@ -1340,25 +1366,25 @@ impl SolutionCombiner {
 

	
 
            // Stack length is 1, hence we're back at our initial solution.
 
            // Since that doesn't yield a global solution, we simply:
 
            return None;
 
        }
 

	
 
        // Constructing the representation of the global solution
 
        debug_assert_eq!(stack.len(), self.local.len());
 
        let mut final_branches = Vec::with_capacity(stack.len());
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 
            final_branches.push((component.component, solution.final_branch_id));
 
            final_branches.push((component.component, solution.final_branch_id, component.sync_round));
 
        }
 

	
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_channels = 0;
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            total_num_channels += component.solutions[0].channel_mapping.len();
 
        }
 

	
 
        total_num_channels /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_channels);
 
        let mut total_num_checked = 0;
 
@@ -1433,24 +1459,25 @@ impl SolutionCombiner {
 
    fn combine(&mut self, combiner: SolutionCombiner) -> Option<LeaderConclusion> {
 
        self.failure_reported = self.failure_reported || combiner.failure_reported;
 

	
 
        // Handle local solutions
 
        if self.local.is_empty() {
 
            // Trivial case
 
            self.local = combiner.local;
 
        } else {
 
            for local in combiner.local {
 
                for matched in local.solutions {
 
                    let local_solution = LocalSolution{
 
                        component: local.component,
 
                        sync_round_number: local.sync_round,
 
                        final_branch_id: matched.final_branch_id,
 
                        port_mapping: matched.channel_mapping,
 
                    };
 
                    let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution);
 
                    if let Some(global_solution) = maybe_solution {
 
                        return Some(LeaderConclusion::Solution(global_solution));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Handle channel presence
src/runtime2/scheduler.rs
Show inline comments
 
@@ -139,25 +139,35 @@ impl Scheduler {
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 

	
 
                    // We insert directly into the private inbox. Since we have
 
                    // a reroute entry the component can not yet be running.
 
                    if let Message::Control(_) = &message {
 
                        self.runtime.send_message(other_component_id, message);
 
                    } else {
 
                        let key = unsafe { ConnectorKey::from_id(other_component_id) };
 
                        let component = self.runtime.get_component_private(&key);
 
                        component.ctx.inbox.insert_new(message);
 
                    }
 

	
 
                    continue;
 
                }
 

	
 
                match scheduled.ctx.get_port_by_id(target_port) {
 
                    Some(port_info) => {
 
                        if port_info.state == PortState::Closed {
 
                            // We're no longer supposed to receive messages
 
                            // (rerouted message arrived much later!)
 
                            continue
 
                        }
 
                    },
 
                    None => {
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -2,31 +2,31 @@ mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 1500; // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 
// pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 750;  // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 

	
 
// pub(crate) const NUM_THREADS: u32 = 6;
 
// pub(crate) const NUM_INSTANCES: u32 = 1;
 
// pub(crate) const NUM_LOOPS: u32 = 15;
 
pub(crate) const NUM_THREADS: u32 = 6;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
src/runtime2/tests/sync_failure.rs
Show inline comments
 
@@ -41,52 +41,95 @@ primitive failing_at_location(in<bool> input, out<bool> output, Location loc) {
 
        if (loc == Location::BeforeSync) failure_array[0];
 
        sync {
 
            put(output, true);
 
            if (loc == Location::AfterPut) failure_array[0];
 
            auto received = get(input);
 
            assert(received);
 
            if (loc == Location::AfterGet) failure_array[0];
 
        }
 
        if (loc == Location::AfterSync) failure_array[0];
 
    }
 
}
 

	
 
composite constructor_a(Location loc) {
 
composite constructor_pair_a(Location loc) {
 
    channel output_a -> input_a;
 
    channel output_b -> input_b;
 
    new failing_at_location(input_b, output_a, loc);
 
    new failing_at_location(input_a, output_b, Location::Never);
 
}
 

	
 
composite constructor_b(Location loc) {
 
composite constructor_pair_b(Location loc) {
 
    channel output_a -> input_a;
 
    channel output_b -> input_b;
 
    new failing_at_location(input_b, output_a, Location::Never);
 
    new failing_at_location(input_a, output_b, loc);
 
}";
 
}
 

	
 
composite constructor_ring(u32 ring_size, u32 fail_a, Location loc_a, u32 fail_b, Location loc_b) {
 
    channel output_first -> input_old;
 
    channel output_cur -> input_new;
 

	
 
    u32 ring_index = 0;
 
    while (ring_index < ring_size) {
 
        auto cur_loc = Location::Never;
 
        if (ring_index == fail_a) cur_loc = loc_a;
 
        if (ring_index == fail_b) cur_loc = loc_b;
 

	
 
        new failing_at_location(input_old, output_cur, cur_loc);
 

	
 
        if (ring_index == ring_size - 2) {
 
            // Don't create a new channel, join up the last one
 
            output_cur = output_first;
 
            input_old = input_new;
 
        } else if (ring_index != ring_size - 1) {
 
            channel output_fresh -> input_fresh;
 
            input_old = input_new;
 
            output_cur = output_fresh;
 
            input_new = input_fresh;
 
        }
 

	
 
        ring_index += 1;
 
    }
 
}
 
";
 

	
 
#[test]
 
fn test_shared_sync_failure_variant_a() {
 
fn test_shared_sync_failure_pair_variant_a() {
 
    // One fails, the other one should somehow detect it and fail as well. This
 
    // variant constructs the failing component first.
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor_a", ValueGroup::new_stack(vec![
 
            api.create_connector("", "constructor_pair_a", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure_variant_b() {
 
fn test_shared_sync_failure_pair_variant_b() {
 
    // One fails, the other one should somehow detect it and fail as well. This
 
    // variant constructs the successful component first.
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 {
 
            api.create_connector("", "constructor_b", ValueGroup::new_stack(vec![
 
            api.create_connector("", "constructor_pair_b", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure_ring_variant_a() {
 
    // Only one component in the ring should fail
 
    const RING_SIZE: u32 = 4;
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 {
 
            api.create_connector("", "constructor_ring", ValueGroup::new_stack(vec![
 
                Value::UInt32(RING_SIZE),
 
                Value::UInt32(RING_SIZE / 2), Value::Enum(variant), // fail "halfway" the ring
 
                Value::UInt32(RING_SIZE), Value::Enum(0), // never occurs, index is equal to ring size
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)