use crate::protocol::eval::ValueGroup; use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; use super::component_context::*; pub struct PortAnnotation { self_comp_id: CompId, self_port_id: PortId, peer_comp_id: CompId, // only valid for getter ports peer_port_id: PortId, // only valid for getter ports peer_discovered: bool, // only valid for getter ports mapping: Option, kind: PortKind, } impl PortAnnotation { fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self { return Self{ self_comp_id: comp_id, self_port_id: port_id, peer_comp_id: CompId::new_invalid(), peer_port_id: PortId::new_invalid(), peer_discovered: false, mapping: None, kind, } } } #[derive(Debug, Eq, PartialEq)] enum Mode { NonSync, SyncBusy, SyncAwaitingSolution, SelectBusy, SelectWait, } struct SolutionCombiner { solution: SyncPartialSolution, matched_channels: usize, } impl SolutionCombiner { fn new() -> Self { return Self { solution: SyncPartialSolution::default(), matched_channels: 0, } } #[inline] fn has_contributions(&self) -> bool { return !self.solution.channel_mapping.is_empty(); } /// Returns a decision for the current round. If there is no decision (yet) /// then `RoundDecision::None` is returned. fn get_decision(&self) -> SyncRoundDecision { if self.matched_channels == self.solution.channel_mapping.len() { debug_assert_ne!(self.solution.decision, SyncRoundDecision::None); return self.solution.decision; } return SyncRoundDecision::None; // even in case of failure: wait for everyone. } fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) { debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); debug_assert_ne!(partial.decision, SyncRoundDecision::Solution); if partial.decision == SyncRoundDecision::Failure { self.solution.decision = SyncRoundDecision::Failure; } for entry in partial.channel_mapping { let channel_index = if entry.getter.is_some() && entry.putter.is_some() { let channel_index = self.solution.channel_mapping.len(); self.solution.channel_mapping.push(entry); channel_index } else if let Some(putter) = entry.putter { self.combine_with_putter_port(putter) } else if let Some(getter) = entry.getter { self.combine_with_getter_port(getter) } else { unreachable!(); // both putter and getter are None }; let channel = &self.solution.channel_mapping[channel_index]; if let Some(consistent) = Self::channel_is_consistent(channel) { if !consistent { self.solution.decision = SyncRoundDecision::Failure; } self.matched_channels += 1; } } self.update_solution(); } /// Combines the currently stored global solution (if any) with the newly /// provided local solution. Make sure to check the `has_decision` return /// value afterwards. fn combine_with_local_solution(&mut self, _comp_id: CompId, solution: SyncLocalSolution) { debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); // Combine partial solution with the local solution entries for entry in solution { // Match the current entry up with its peer endpoint, or add a new // entry. let channel_index = match entry { SyncLocalSolutionEntry::Putter(putter) => { self.combine_with_putter_port(putter) }, SyncLocalSolutionEntry::Getter(getter) => { self.combine_with_getter_port(getter) } }; // Check if channel is now consistent let channel = &self.solution.channel_mapping[channel_index]; if let Some(consistent) = Self::channel_is_consistent(channel) { if !consistent { self.solution.decision = SyncRoundDecision::Failure; } self.matched_channels += 1; } } self.update_solution(); } /// Takes whatever partial solution is present in the solution combiner and /// returns it. The solution combiner's solution will end up being empty. /// This is used when a new leader is found and we need to pass along our /// partial results. fn take_partial_solution(&mut self) -> SyncPartialSolution { let mut partial_solution = SyncPartialSolution::default(); std::mem::swap(&mut partial_solution, &mut self.solution); self.clear(); return partial_solution; } fn clear(&mut self) { self.solution.channel_mapping.clear(); self.solution.decision = SyncRoundDecision::None; self.matched_channels = 0; } // --- Small utilities for combining solutions fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize { let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id); if let Some(channel_index) = channel_index { let channel = &mut self.solution.channel_mapping[channel_index]; debug_assert!(channel.putter.is_none()); channel.putter = Some(putter); return channel_index; } else { let channel_index = self.solution.channel_mapping.len(); self.solution.channel_mapping.push(SyncSolutionChannel{ putter: Some(putter), getter: None, }); return channel_index; } } fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize { let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id); if let Some(channel_index) = channel_index { let channel = &mut self.solution.channel_mapping[channel_index]; debug_assert!(channel.getter.is_none()); channel.getter = Some(getter); return channel_index; } else { let channel_index = self.solution.channel_mapping.len(); self.solution.channel_mapping.push(SyncSolutionChannel{ putter: None, getter: Some(getter) }); return channel_index; } } /// Retrieve index of the channel containing a getter port that has received /// from the specified putter port. fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option { for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() { if let Some(getter) = &channel.getter { if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id { return Some(channel_index); } } } return None; } /// Retrieve index of the channel for a getter port. To find this channel /// the **peer** component/port IDs of the getter port are used. fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option { for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() { if let Some(putter) = &channel.putter { if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id { return Some(channel_index); } } } return None; } fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option { if channel.putter.is_none() || channel.getter.is_none() { return None; } let putter = channel.putter.as_ref().unwrap(); let getter = channel.getter.as_ref().unwrap(); return Some(putter.mapping == getter.mapping); } /// Determines the global solution if all components have contributed their /// local solutions. fn update_solution(&mut self) { if self.matched_channels == self.solution.channel_mapping.len() { if self.solution.decision != SyncRoundDecision::Failure { self.solution.decision = SyncRoundDecision::Solution; } } } } /// Tracking consensus state pub struct Consensus { // General state of consensus manager mapping_counter: u32, mode: Mode, // State associated with sync round round_index: u32, highest_id: CompId, ports: Vec, // State associated with arriving at a solution and being a (temporary) // leader in the consensus round solution: SolutionCombiner, } impl Consensus { pub(crate) fn new() -> Self { return Self{ round_index: 0, highest_id: CompId::new_invalid(), ports: Vec::new(), mapping_counter: 0, mode: Mode::NonSync, solution: SolutionCombiner::new(), } } // ------------------------------------------------------------------------- // Managing sync state // ------------------------------------------------------------------------- /// Notifies the consensus management that the PDL code has reached the /// start of a sync block. pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { debug_assert_eq!(self.mode, Mode::NonSync); self.highest_id = comp_ctx.id; self.mapping_counter = 0; self.mode = Mode::SyncBusy; // Make the internally stored port annotation array consistent with the // ports that the component currently owns. They should match by index // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`). let mut needs_setting_ports = false; if comp_ctx.num_ports() != self.ports.len() { needs_setting_ports = true; } else { for (idx, port) in comp_ctx.iter_ports().enumerate() { let comp_port_id = port.self_id; let cons_port_id = self.ports[idx].self_port_id; if comp_port_id != cons_port_id { needs_setting_ports = true; break; } } } if needs_setting_ports { // Reset all ports self.ports.clear(); self.ports.reserve(comp_ctx.num_ports()); for port in comp_ctx.iter_ports() { self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind)); } } else { // Make sure that we consider all peers as undiscovered again for annotation in self.ports.iter_mut() { annotation.peer_discovered = false; } } } /// Notifies the consensus management that the PDL code has reached the end /// of a sync block. A local solution will be submitted, after which we wait /// until the participants in the round (hopefully) reach a conclusion. pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; // Submit our port mapping as a solution let mut local_solution = Vec::with_capacity(self.ports.len()); for port in &self.ports { if let Some(mapping) = port.mapping { let port_handle = comp_ctx.get_port_handle(port.self_port_id); let port_info = comp_ctx.get_port(port_handle); let new_entry = match port_info.kind { PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ self_comp_id: comp_ctx.id, self_port_id: port_info.self_id, mapping }), PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ self_comp_id: comp_ctx.id, self_port_id: port_info.self_id, peer_comp_id: port.peer_comp_id, peer_port_id: port.peer_port_id, mapping }) }; local_solution.push(new_entry); } } let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); return decision; } /// Notifies that a decision has been reached. Note that the caller should /// still take the appropriate actions based on the decision it is supplying /// to the consensus layer. pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) { // Reset everything for the next round debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); self.mode = Mode::NonSync; self.round_index = self.round_index.wrapping_add(1); for port in self.ports.iter_mut() { port.mapping = None; } self.solution.clear(); } // ------------------------------------------------------------------------- // Handling inbound and outbound messages // ------------------------------------------------------------------------- /// Prepares a set of values to be sent of a channel. pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id)); let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); return DataMessage{ data_header, sync_header, content }; } /// Handles the arrival of a new data message (needs to be called for every /// new data message, even though it might not end up being received). This /// is used to determine peers of `get`ter ports. // TODO: The use of this function is rather ugly. Find a more robust // scheme about owners of `get`ter ports not knowing about their peers. // (also, figure out why this was written again, I forgot). pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); let annotation = &mut self.ports[target_index]; debug_assert!( !annotation.peer_discovered || ( annotation.peer_comp_id == message.sync_header.sending_id && annotation.peer_port_id == message.data_header.source_port ) ); annotation.peer_comp_id = message.sync_header.sending_id; annotation.peer_port_id = message.data_header.source_port; annotation.peer_discovered = true; } /// Checks if the data message can be received (due to port annotations), if /// it can then `true` is returned and the caller is responsible for handing /// the message of to the PDL code. Otherwise the message cannot be /// received. pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool { debug_assert_eq!(self.mode, Mode::SyncBusy); debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping { // Determine our annotation, in order to do so we need to find the // port matching the peer ports let mut self_annotation = None; let mut self_annotation_found = false; match peer_port_kind { PortAnnotationKind::Putter(peer_port) => { for self_port in &self.ports { if self_port.kind == PortKind::Getter && self_port.peer_discovered && self_port.peer_comp_id == peer_port.self_comp_id && self_port.peer_port_id == peer_port.self_port_id { self_annotation = self_port.mapping; self_annotation_found = true; break; } } }, PortAnnotationKind::Getter(peer_port) => { if peer_port.peer_comp_id == comp_ctx.id { // Peer indicates that we talked to it let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id); let self_port_index = comp_ctx.get_port_index(self_port_handle); self_annotation = self.ports[self_port_index].mapping; self_annotation_found = true; } } } if !self_annotation_found { continue } if self_annotation != *expected_annotation { return false; } } // Expected mapping matches current mapping, so we will receive the message self.set_annotation(message.sync_header.sending_id, &message.data_header); // Handle the sync header embedded within the data message self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header); return true; } /// Receives the sync message and updates the consensus state appropriately. pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision { // Whatever happens: handle the sync header (possibly changing the // currently registered leader) self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header); match message.content { SyncMessageContent::NotificationOfLeader => { return SyncRoundDecision::None; }, SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => { return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution); }, SyncMessageContent::PartialSolution(partial_solution) => { return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution); }, SyncMessageContent::GlobalSolution => { debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution return SyncRoundDecision::Solution; }, SyncMessageContent::GlobalFailure => { debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); return SyncRoundDecision::Failure; } } } fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) { if header.highest_id.0 > self.highest_id.0 { // Sender knows of someone with a higher ID. So store highest ID, // notify all peers, and forward local solutions self.highest_id = header.highest_id; for peer in comp_ctx.iter_peers() { if peer.id == header.sending_id { continue; // do not send to sender: it has the higher ID } // also: only send if we received a message in this round let mut performed_communication = false; // TODO: Revise, temporary fix for port in self.ports.iter() { if port.peer_comp_id == peer.id && port.mapping.is_some() { performed_communication = true; break; } } if !performed_communication { continue; } let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } self.forward_partial_solution(sched_ctx, comp_ctx); } else if header.highest_id.0 < self.highest_id.0 { // Sender has a lower ID, so notify it of our higher one let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; let peer_handle = comp_ctx.get_peer_handle(header.sending_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } // else: exactly equal } fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) { for annotation in self.ports.iter_mut() { if annotation.self_port_id == data_header.target_port { // Message should have already passed the `handle_new_data_message` function, so we // should have already annotated the peer of the port. debug_assert!( annotation.peer_discovered && annotation.peer_comp_id == source_comp_id && annotation.peer_port_id == data_header.source_port ); annotation.mapping = Some(data_header.new_mapping); } } } // ------------------------------------------------------------------------- // Leader-related methods // ------------------------------------------------------------------------- fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader // Make sure that we have something to send if !self.solution.has_contributions() { return; } // Swap the container with the partial solution and then send it along let partial_solution = self.solution.take_partial_solution(); self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::PartialSolution(partial_solution), })); } fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader self.solution.combine_with_local_solution(solution_sender_id, solution); let round_decision = self.solution.get_decision(); if round_decision != SyncRoundDecision::None { self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } return round_decision; } else { // Forward the solution let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::LocalSolution(solution_sender_id, solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); return SyncRoundDecision::None; } } fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader, combine existing and new solution self.solution.combine_with_partial_solution(solution); let round_decision = self.solution.get_decision(); if round_decision != SyncRoundDecision::None { self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } return round_decision; } else { // Forward the partial solution let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::PartialSolution(solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); return SyncRoundDecision::None; } } fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) { debug_assert_eq!(self.highest_id, comp_ctx.id); let is_success = match decision { SyncRoundDecision::None => unreachable!(), SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance for channel in self.solution.solution.channel_mapping.iter() { let getter = channel.getter.as_ref().unwrap(); if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) { peers.push(getter.self_comp_id); } if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) { peers.push(getter.peer_comp_id); } } for peer in peers { let mut handle = sched_ctx.runtime.get_component_public(peer); let message = Message::Sync(SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, }); handle.send_message(&sched_ctx.runtime, message, true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); } } fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); leader_info.send_message(&sched_ctx.runtime, message, true); let should_remove = leader_info.decrement_users(); if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); } } // ------------------------------------------------------------------------- // Creating message headers // ------------------------------------------------------------------------- fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX; for (index, port) in self.ports.iter().enumerate() { if port.self_port_id == port_info.self_id { port_index = index; // remember for later updating } // Add all of the let annotation_kind = match port.kind { PortKind::Putter => { PortAnnotationKind::Putter(PortAnnotationPutter{ self_comp_id: port.self_comp_id, self_port_id: port.self_port_id }) }, PortKind::Getter => { if !port.peer_discovered { continue; } PortAnnotationKind::Getter(PortAnnotationGetter{ self_comp_id: port.self_comp_id, self_port_id: port.self_port_id, peer_comp_id: port.peer_comp_id, peer_port_id: port.peer_port_id, }) } }; expected_mapping.push((annotation_kind, port.mapping)); } let new_mapping = self.take_mapping(); self.ports[port_index].mapping = Some(new_mapping); debug_assert_eq!(port_info.kind, PortKind::Putter); return MessageDataHeader{ expected_mapping, new_mapping, source_port: port_info.self_id, target_port: port_info.peer_port_id, }; } #[inline] fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader { return MessageSyncHeader{ sync_round: self.round_index, sending_id: comp_ctx.id, highest_id: self.highest_id, }; } #[inline] fn take_mapping(&mut self) -> u32 { let mapping = self.mapping_counter; self.mapping_counter = self.mapping_counter.wrapping_add(1); return mapping; } }