diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 4d6f13a148bb75ed3b0566df24120e4c00d69826..b96ff7ac38f93de7a38a605dd8f715a2f711d899 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -23,12 +23,13 @@ struct BranchAnnotation { pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, + sync_round_number: u32, port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { - component_branches: Vec<(ConnectorId, BranchId)>, + component_branches: Vec<(ConnectorId, BranchId, u32)>, channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } @@ -42,6 +43,7 @@ pub enum RoundConclusion { // Consensus // ----------------------------------------------------------------------------- +#[derive(Debug)] struct Peer { id: ConnectorId, encountered_this_round: bool, @@ -290,6 +292,7 @@ impl Consensus { let local_solution = LocalSolution{ component: ctx.id, + sync_round_number: self.sync_round, final_branch_id: branch_id, port_mapping: target_mapping, }; @@ -324,6 +327,8 @@ impl Consensus { peer.encountered_this_round = false; peer.expected_sync_round += 1; } + + println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers) } // --- Handling messages @@ -428,8 +433,8 @@ impl Consensus { SyncCompContent::GlobalSolution(solution) => { // Found a global solution debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader - let (_, branch_id) = solution.component_branches.iter() - .find(|(component_id, _)| *component_id == ctx.id) + let (_, branch_id, _) = solution.component_branches.iter() + .find(|(component_id, _, _)| *component_id == ctx.id) .unwrap(); return Some(RoundConclusion::Success(*branch_id)); }, @@ -739,19 +744,35 @@ impl Consensus { // Handle the global solution let mut my_final_branch_id = BranchId::new_invalid(); - for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() { if connector_id == ctx.id { // This is our solution branch my_final_branch_id = branch_id; continue; } + // Send solution message let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncCompContent::GlobalSolution(global_solution.clone()), }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + + // Update peers as leader. Subsequent call to `end_sync` will update + // the round numbers + match self.peers.iter_mut().find(|v| v.id == connector_id) { + Some(peer) => { + peer.expected_sync_round = sync_round; + }, + None => { + self.peers.push(Peer{ + id: connector_id, + expected_sync_round: sync_round, + encountered_this_round: true, + }); + } + } } debug_assert!(my_final_branch_id.is_valid()); @@ -780,7 +801,9 @@ impl Consensus { }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } - } else if let Some(owner_b) = presence.owner_b { + } + + if let Some(owner_b) = presence.owner_b { if owner_b != ctx.id { if encountered.push(owner_b) { let message = SyncCompMessage{ @@ -907,6 +930,7 @@ struct ComponentPeer { #[derive(Debug, Clone)] struct ComponentLocalSolutions { component: ConnectorId, + sync_round: u32, peers: Vec, solutions: Vec, all_peers_present: bool, @@ -979,6 +1003,7 @@ impl SolutionCombiner { /// of peer connectors. fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { let component_id = solution.component; + let sync_round = solution.sync_round_number; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, channel_mapping: solution.port_mapping, @@ -1002,6 +1027,7 @@ impl SolutionCombiner { let component_index = self.local.len(); self.local.push(ComponentLocalSolutions{ component: component_id, + sync_round, peers: Vec::new(), solutions: vec![solution], all_peers_present: false, @@ -1349,7 +1375,7 @@ impl SolutionCombiner { for entry in &stack { let component = &self.local[entry.component_index]; let solution = &component.solutions[entry.solution_index]; - final_branches.push((component.component, solution.final_branch_id)); + final_branches.push((component.component, solution.final_branch_id, component.sync_round)); } // Just debugging here, TODO: @remove @@ -1442,6 +1468,7 @@ impl SolutionCombiner { for matched in local.solutions { let local_solution = LocalSolution{ component: local.component, + sync_round_number: local.sync_round, final_branch_id: matched.final_branch_id, port_mapping: matched.channel_mapping, }; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e7b6bc05c059714590c04c83b649bd68f9845c18..4f7c83bbe328312359205cdaa20633ea9b30cca9 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -148,7 +148,17 @@ impl Scheduler { if let Some(target_port) = message.target_port() { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); - self.runtime.send_message(other_component_id, message); + + // We insert directly into the private inbox. Since we have + // a reroute entry the component can not yet be running. + if let Message::Control(_) = &message { + self.runtime.send_message(other_component_id, message); + } else { + let key = unsafe { ConnectorKey::from_id(other_component_id) }; + let component = self.runtime.get_component_private(&key); + component.ctx.inbox.insert_new(message); + } + continue; } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 31d3f161c6d982dd33fee742b5f57224dd756947..3de2cced47acc7d12adcecc814e2290966161505 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -11,13 +11,13 @@ use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing -pub(crate) const NUM_THREADS: u32 = 8; // number of threads in runtime -pub(crate) const NUM_INSTANCES: u32 = 1500; // number of test instances constructed -pub(crate) const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) +// pub(crate) const NUM_THREADS: u32 = 8; // number of threads in runtime +// pub(crate) const NUM_INSTANCES: u32 = 750; // number of test instances constructed +// pub(crate) const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) -// pub(crate) const NUM_THREADS: u32 = 6; -// pub(crate) const NUM_INSTANCES: u32 = 1; -// pub(crate) const NUM_LOOPS: u32 = 15; +pub(crate) const NUM_THREADS: u32 = 6; +pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_LOOPS: u32 = 1; fn create_runtime(pdl: &str) -> Runtime { diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs index ae6becc1f9226fcd4a2a1b3247d8f3c6027eedf0..2960280107bae342e742ad3988d1f629f30e6a54 100644 --- a/src/runtime2/tests/sync_failure.rs +++ b/src/runtime2/tests/sync_failure.rs @@ -50,28 +50,56 @@ primitive failing_at_location(in input, out output, Location loc) { } } -composite constructor_a(Location loc) { +composite constructor_pair_a(Location loc) { channel output_a -> input_a; channel output_b -> input_b; new failing_at_location(input_b, output_a, loc); new failing_at_location(input_a, output_b, Location::Never); } -composite constructor_b(Location loc) { +composite constructor_pair_b(Location loc) { channel output_a -> input_a; channel output_b -> input_b; new failing_at_location(input_b, output_a, Location::Never); new failing_at_location(input_a, output_b, loc); -}"; +} + +composite constructor_ring(u32 ring_size, u32 fail_a, Location loc_a, u32 fail_b, Location loc_b) { + channel output_first -> input_old; + channel output_cur -> input_new; + + u32 ring_index = 0; + while (ring_index < ring_size) { + auto cur_loc = Location::Never; + if (ring_index == fail_a) cur_loc = loc_a; + if (ring_index == fail_b) cur_loc = loc_b; + + new failing_at_location(input_old, output_cur, cur_loc); + + if (ring_index == ring_size - 2) { + // Don't create a new channel, join up the last one + output_cur = output_first; + input_old = input_new; + } else if (ring_index != ring_size - 1) { + channel output_fresh -> input_fresh; + input_old = input_new; + output_cur = output_fresh; + input_new = input_fresh; + } + + ring_index += 1; + } +} +"; #[test] -fn test_shared_sync_failure_variant_a() { +fn test_shared_sync_failure_pair_variant_a() { // One fails, the other one should somehow detect it and fail as well. This // variant constructs the failing component first. run_test_in_runtime(SHARED_SYNC_CODE, |api| { for variant in 0..4 { // all `Location` enum variants, except `Never`. // Create the channels - api.create_connector("", "constructor_a", ValueGroup::new_stack(vec![ + api.create_connector("", "constructor_pair_a", ValueGroup::new_stack(vec![ Value::Enum(variant) ])).expect("create connector"); } @@ -79,14 +107,29 @@ fn test_shared_sync_failure_variant_a() { } #[test] -fn test_shared_sync_failure_variant_b() { +fn test_shared_sync_failure_pair_variant_b() { // One fails, the other one should somehow detect it and fail as well. This // variant constructs the successful component first. run_test_in_runtime(SHARED_SYNC_CODE, |api| { for variant in 0..4 { - api.create_connector("", "constructor_b", ValueGroup::new_stack(vec![ + api.create_connector("", "constructor_pair_b", ValueGroup::new_stack(vec![ Value::Enum(variant) ])).expect("create connector"); } }) +} + +#[test] +fn test_shared_sync_failure_ring_variant_a() { + // Only one component in the ring should fail + const RING_SIZE: u32 = 4; + run_test_in_runtime(SHARED_SYNC_CODE, |api| { + for variant in 0..4 { + api.create_connector("", "constructor_ring", ValueGroup::new_stack(vec![ + Value::UInt32(RING_SIZE), + Value::UInt32(RING_SIZE / 2), Value::Enum(variant), // fail "halfway" the ring + Value::UInt32(RING_SIZE), Value::Enum(0), // never occurs, index is equal to ring size + ])).expect("create connector"); + } + }) } \ No newline at end of file