diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index d989a63febc246bd6ed3d643c8cd6fa08b15d9d3..d30f5874a04afb8df60b09bb39b3ad8f0c4f08e7 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -2,7 +2,6 @@ use crate::protocol::eval::ValueGroup; use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; -use crate::runtime2::component::wake_up_if_sleeping; use super::component_pdl::*; @@ -17,7 +16,7 @@ impl PortAnnotation { } } -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] enum Mode { NonSync, SyncBusy, @@ -32,24 +31,25 @@ struct SolutionCombiner { impl SolutionCombiner { fn new() -> Self { return Self { - solution: SyncPartialSolution{ - submissions_by: Vec::new(), - channel_mapping: Vec::new(), - decision: RoundDecision::None, - }, + solution: SyncPartialSolution::default(), all_present: false, } } + #[inline] + fn has_contributions(&self) -> bool { + return !self.solution.submissions_by.is_empty(); + } + /// Returns a decision for the current round. If there is no decision (yet) /// then `RoundDecision::None` is returned. - fn get_decision(&self) -> RoundDecision { + fn get_decision(&self) -> SyncRoundDecision { if self.all_present { - debug_assert_ne!(self.solution.decision, RoundDecision::None); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::None); return self.solution.decision; } - return RoundDecision::None; // even if failure: wait for everyone. + return SyncRoundDecision::None; // even in case of failure: wait for everyone. } fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) { @@ -58,8 +58,8 @@ impl SolutionCombiner { self.mark_single_component_submission(comp_id, present); } - debug_assert_ne!(self.solution.decision, RoundDecision::Solution); - debug_assert_ne!(partial.decision, RoundDecision::Solution); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); + debug_assert_ne!(partial.decision, SyncRoundDecision::Solution); // Combine our partial solution with the provided partial solution. // This algorithm *could* allow overlap in the partial solutions, but @@ -125,18 +125,18 @@ impl SolutionCombiner { // Make sure the new entry is consistent let channel = &self.solution.channel_mapping[channel_index]; if !Self::channel_is_consistent(channel) { - self.solution.decision = RoundDecision::Failure; + self.solution.decision = SyncRoundDecision::Failure; } } // Check to see if we have a global solution already self.update_all_present(); - if self.all_present && self.solution.decision != RoundDecision::Failure { - debug_assert_eq!(self.solution.decision, RoundDecision::None); + if self.all_present && self.solution.decision != SyncRoundDecision::Failure { + debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); dbg_code!(for entry in &self.solution.channel_mapping { debug_assert!(entry.putter.is_some() && entry.getter.is_some()); }); - self.solution.decision = RoundDecision::Solution; + self.solution.decision = SyncRoundDecision::Solution; } } @@ -151,19 +151,20 @@ impl SolutionCombiner { self.mark_single_component_submission(entry.peer_comp_id, false); } - debug_assert_ne!(self.solution.decision, RoundDecision::Solution); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); // Go through all entries and check if the submitted local solution is // consistent with our partial solution let mut had_new_entry = false; for entry in solution.iter() { let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry); - let new_port = SolutionPort{ + let new_port = SyncSolutionPort{ self_comp_id: comp_id, self_port_id: entry.self_port_id, peer_comp_id: entry.peer_comp_id, peer_port_id: entry.peer_port_id, mapping: entry.mapping, + port_kind: entry.port_kind, }; match preexisting_index { @@ -172,26 +173,28 @@ impl SolutionCombiner { // the global solution. We'll handle any mismatches along // the way. let channel = &mut self.solution.channel_mapping[entry_index]; - if entry.is_putter { - // Getter should be present in existing entry - debug_assert!(channel.getter.is_some() && channel.putter.is_none()); - channel.putter = Some(new_port); - } else { - // Putter should be present in existing entry - debug_assert!(channel.putter.is_some() && channel.getter.is_none()); - channel.getter = Some(new_port); - }; + match entry.port_kind { + PortKind::Putter => { + // Getter should be present in existing entry + debug_assert!(channel.getter.is_some() && channel.putter.is_none()); + channel.putter = Some(new_port); + }, + PortKind::Getter => { + // Putter should be present in existing entry + debug_assert!(channel.putter.is_some() && channel.getter.is_none()); + channel.getter = Some(new_port); + } + } if !Self::channel_is_consistent(channel) { - self.solution.decision = RoundDecision::Failure; + self.solution.decision = SyncRoundDecision::Failure; } }, None => { // No entry yet. So add it - let new_solution = if entry.is_putter { - SolutionChannel{ putter: Some(new_port), getter: None } - } else { - SolutionChannel{ putter: None, getter: Some(new_port) } + let new_solution = match entry.port_kind { + PortKind::Putter => SyncSolutionChannel{ putter: Some(new_port), getter: None }, + PortKind::Getter => SyncSolutionChannel{ putter: None, getter: Some(new_port) }, }; self.solution.channel_mapping.push(new_solution); had_new_entry = true; @@ -201,19 +204,39 @@ impl SolutionCombiner { if !had_new_entry { self.update_all_present(); - if self.all_present && self.solution.decision != RoundDecision::Failure { + if self.all_present && self.solution.decision != SyncRoundDecision::Failure { // No new entries and every component is present. This implies that // every component successfully added their local solutions to the // global solution. Hence: we have a global solution - debug_assert_eq!(self.solution.decision, RoundDecision::None); + debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); dbg_code!(for entry in &self.solution.channel_mapping { debug_assert!(entry.putter.is_some() && entry.getter.is_some()); }); - self.solution.decision = RoundDecision::Solution; + self.solution.decision = SyncRoundDecision::Solution; } } } + /// Takes whatever partial solution is present in the solution combiner and + /// returns it. The solution combiner's solution will end up being empty. + /// This is used when a new leader is found and we need to pass along our + /// partial results. + fn take_partial_solution(&mut self) -> SyncPartialSolution { + let mut partial_solution = SyncPartialSolution::default(); + std::mem::swap(&mut partial_solution, &mut self.solution); + self.all_present = false; + + return partial_solution; + } + + fn clear(&mut self) { + self.solution.submissions_by.clear(); + self.solution.channel_mapping.clear(); + self.solution.decision = SyncRoundDecision::None; + } + + // --- Small utilities for combining solutions + fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) { debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry for (entry, has_contributed) in self.solution.submissions_by.iter_mut() { @@ -290,17 +313,20 @@ impl SolutionCombiner { // port B is part of its channel, but port B may consider port A not // to be part of its channel. Before merging the entries (outside of // this function) we'll make sure this is not the case. - if new_entry.is_putter { - // Expect getter to be present - if let Some(cur_entry) = &cur_entry.getter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); + match new_entry.port_kind { + PortKind::Putter => { + // Expect getter to be present + if let Some(cur_entry) = &cur_entry.getter { + if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { + return Some(entry_index); + } } - } - } else { - if let Some(cur_entry) = &cur_entry.putter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); + }, + PortKind::Getter => { + if let Some(cur_entry) = &cur_entry.putter { + if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { + return Some(entry_index); + } } } } @@ -362,6 +388,8 @@ impl Consensus { // Managing sync state // ------------------------------------------------------------------------- + /// Notifies the consensus management that the PDL code has reached the + /// start of a sync block. pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { debug_assert_eq!(self.mode, Mode::NonSync); self.highest_id = comp_ctx.id; @@ -370,7 +398,10 @@ impl Consensus { self.make_ports_consistent_with_ctx(comp_ctx); } - pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> RoundDecision { + /// Notifies the consensus management that the PDL code has reached the end + /// of a sync block. A local solution will be submitted, after which we wait + /// until the participants in the round (hopefully) reach a conclusion. + pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; @@ -393,6 +424,22 @@ impl Consensus { return decision; } + /// Notifies that a decision has been reached. Note that the caller should + /// still take the appropriate actions based on the decision it is supplying + /// to the consensus layer. + pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) { + // Reset everything for the next round + debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); + self.mode = Mode::NonSync; + self.round_index = self.round_index.wrapping_add(1); + + for port in self.ports.iter_mut() { + port.mapping = None; + } + + self.solution.clear(); + } + fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) { let mut needs_setting_ports = false; if comp_ctx.ports.len() != self.ports.len() { @@ -423,7 +470,7 @@ impl Consensus { pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end - debug_assert!(self.round.ports.iter().any(|v| v.id == port_info.self_id)); + debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); @@ -436,12 +483,12 @@ impl Consensus { /// received. pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool { debug_assert_eq!(self.mode, Mode::SyncBusy); - debug_assert!(self.round.ports.iter().any(|v| v.id == message.data_header.target_port)); + debug_assert!(self.ports.iter().any(|v| v.id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping for (expected_id, expected_annotation) in &message.data_header.expected_mapping { let got_annotation = self.get_annotation(*expected_id); - if got_annotation != expected_annotation { + if got_annotation != *expected_annotation { return false; } } @@ -456,35 +503,38 @@ impl Consensus { } /// Receives the sync message and updates the consensus state appropriately. - pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> RoundDecision { + pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision { // Whatever happens: handle the sync header (possibly changing the // currently registered leader) self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header); match message.content { SyncMessageContent::NotificationOfLeader => { - return RoundDecision::None; + return SyncRoundDecision::None; }, SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => { return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution); }, SyncMessageContent::PartialSolution(partial_solution) => { return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution); - } + }, SyncMessageContent::GlobalSolution => { - // Global solution has been found debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution todo!("clear port mapping or something"); - return RoundDecision::Solution; + return SyncRoundDecision::Solution; }, + SyncMessageContent::GlobalFailure => { + debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); + return SyncRoundDecision::Failure; + } } } fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) { - if header.highest_id > self.round.highest_id { + if header.highest_id.0 > self.highest_id.0 { // Sender knows of someone with a higher ID. So store highest ID, // notify all peers, and forward local solutions - self.round.highest_id = header.highest_id; + self.highest_id = header.highest_id; for peer in &comp_ctx.peers { if peer.id == header.sending_id { continue; @@ -494,20 +544,18 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.inbox.push(Message::Sync(message)); - wake_up_if_sleeping(sched_ctx, peer.id, &peer.handle); + peer.handle.send_message(sched_ctx, Message::Sync(message), true); } - self.forward_local_solutions(sched_ctx, comp_ctx); - } else if header.highest_id < self.round.highest_id { + self.forward_partial_solution(sched_ctx, comp_ctx); + } else if header.highest_id.0 < self.highest_id.0 { // Sender has a lower ID, so notify it of our higher one let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; let peer_info = comp_ctx.get_peer(header.sending_id); - peer_info.handle.inbox.push(Message::Sync(message)); - wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Sync(message), true); } // else: exactly equal } @@ -534,47 +582,30 @@ impl Consensus { // Leader-related methods // ------------------------------------------------------------------------- - fn forward_local_solutions(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - todo!("implement") + fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader + + // Make sure that we have something to send + if !self.solution.has_contributions() { + return; + } + + // Swap the container with the partial solution and then send it along + let partial_solution = self.solution.take_partial_solution(); + self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{ + sync_header: self.create_sync_header(comp_ctx), + content: SyncMessageContent::PartialSolution(partial_solution), + })); } - fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> RoundDecision { + fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader self.solution.combine_with_local_solution(solution_sender_id, solution); let round_decision = self.solution.get_decision(); - let decision_is_solution = match round_decision { - RoundDecision::None => { - // No solution yet - return RoundDecision::None; - }, - RoundDecision::Solution => true, - RoundDecision::Failure => false, - }; - - // If here then we've reached a decision, broadcast it - for (peer_id, _is_present) in self.solution.solution.submissions_by.iter().copied() { - debug_assert!(_is_present); - if peer_id == comp_ctx.id { - // Do not send the result to ourselves - continue; - } - - let mut handle = sched_ctx.runtime.get_component_public(peer_id); - handle.inbox.push(Message::Sync(SyncMessage{ - sync_header: self.create_sync_header(comp_ctx), - content: if decision_is_solution { - SyncMessageContent::GlobalSolution - } else { - SyncMessageContent::GlobalFailure - }, - })); - wake_up_if_sleeping(sched_ctx, peer_id, &handle); - - let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + if round_decision != SyncRoundDecision::None { + self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } - return round_decision; } else { // Forward the solution @@ -583,18 +614,19 @@ impl Consensus { content: SyncMessageContent::LocalSolution(solution_sender_id, solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); - return RoundDecision::None; + return SyncRoundDecision::None; } } - fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> RoundDecision { + fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader, combine existing and new solution self.solution.combine_with_partial_solution(solution); let round_decision = self.solution.get_decision(); - - - return RoundDecision::None; + if round_decision != SyncRoundDecision::None { + self.broadcast_decision(sched_ctx, comp_ctx, round_decision); + } + return round_decision; } else { // Forward the partial solution let message = SyncMessage{ @@ -602,15 +634,40 @@ impl Consensus { content: SyncMessageContent::PartialSolution(solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); - return RoundDecision::None; + return SyncRoundDecision::None; + } + } + + fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) { + debug_assert_eq!(self.highest_id, comp_ctx.id); + + let is_success = match decision { + SyncRoundDecision::None => unreachable!(), + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + for (peer, _) in self.solution.solution.submissions_by.iter().copied() { + if peer == comp_ctx.id { + // Do not send to ourselves + continue; + } + + let mut handle = sched_ctx.runtime.get_component_public(peer); + let message = Message::Sync(SyncMessage{ + sync_header: self.create_sync_header(comp_ctx), + content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, + }); + handle.send_message(sched_ctx, message, true); + let _should_remove = handle.decrement_users(); + debug_assert!(!_should_remove); } } fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader let leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.inbox.push(message); - wake_up_if_sleeping(sched_ctx, self.highest_id, &leader_info); + leader_info.send_message(sched_ctx, message, true); } // ------------------------------------------------------------------------- @@ -620,15 +677,15 @@ impl Consensus { fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX; - for (index, port) in self.round.ports.iter().enumerate() { + for (index, port) in self.ports.iter().enumerate() { if port.id == port_info.self_id { port_index = index; } - expected_mapping.push((port.id, Some(mapping))); + expected_mapping.push((port.id, port.mapping)); } let new_mapping = self.take_mapping(); - self.round.ports[port_index].mapping = Some(new_mapping); + self.ports[port_index].mapping = Some(new_mapping); debug_assert_eq!(port_info.kind, PortKind::Putter); return MessageDataHeader{ expected_mapping, @@ -638,14 +695,16 @@ impl Consensus { }; } + #[inline] fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader { return MessageSyncHeader{ - sync_round: self.round.index, + sync_round: self.round_index, sending_id: comp_ctx.id, highest_id: self.highest_id, }; } + #[inline] fn take_mapping(&mut self) -> u32 { let mapping = self.mapping_counter; self.mapping_counter = self.mapping_counter.wrapping_add(1);