use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal, PortState}; use super::inbox::{ Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, DataMessage, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncControlMessage, SyncControlContent }; use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket}; struct BranchAnnotation { channel_mapping: Vec, cur_marker: BranchMarker, } #[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, sync_round_number: u32, port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId, u32)>, channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } #[derive(Debug, PartialEq, Eq)] pub enum RoundConclusion { Failure, Success(BranchId), } // ----------------------------------------------------------------------------- // Consensus // ----------------------------------------------------------------------------- #[derive(Debug)] struct Peer { id: ConnectorId, encountered_this_round: bool, expected_sync_round: u32, } /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. /// /// The type itself serves as an experiment to see how code should be organized. // TODO: Flatten all datastructures // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. Should reduce locking by quite a // bit. // TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID // but we do want to enumerate all current ports. So put that somewhere in a // central place. Secondly. Error handling and regular message handling is // becoming a mess. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, // index is branch ID branch_markers: Vec, // index is branch marker, maps to branch // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, handled_wave: bool, // encountered notification wave in this round conclusion: Option, ack_remaining: u32, // --- Persistent state peers: Vec, sync_round: u32, // --- Workspaces workspace_ports: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Consistency { Valid, Inconsistent, } #[derive(Debug, PartialEq, Eq)] pub(crate) enum MessageOrigin { Past, Present, Future } impl Consensus { pub fn new() -> Self { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), handled_wave: false, conclusion: None, ack_remaining: 0, peers: Vec::new(), sync_round: 0, workspace_ports: Vec::new(), } } // --- Controlling sync round and branches /// Returns whether the consensus algorithm is running in sync mode pub fn is_in_sync(&self) -> bool { return !self.branch_annotations.is_empty(); } #[deprecated] pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation { let branch = &self.branch_annotations[branch_id.index as usize]; let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap(); return port; } /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ channel_mapping: ctx.get_ports().iter() .map(|v| ChannelAnnotation { channel_id: v.channel_id, registered_id: None, expected_firing: None, }) .collect(), cur_marker: BranchMarker::new_invalid(), }); self.branch_markers.push(BranchId::new_invalid()); self.highest_connector_id = ctx.id; } /// Notifies the consensus algorithm that a new branch has appeared. Must be /// called for each forked branch in the execution tree. pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { // If called correctly. Then each time we are notified the new branch's // index is the length in `branch_annotations`. debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ channel_mapping: parent_branch_annotations.channel_mapping.clone(), cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); self.branch_markers.push(new_branch_id); } /// Notifies the consensus algorithm that a particular branch has /// encountered an unrecoverable error. pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); // Check for trivial case, where branch has not yet communicated within // the consensus algorithm let branch = &self.branch_annotations[failed_branch_id.index as usize]; if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) { println!("DEBUG: Failure everything silent"); return Some(RoundConclusion::Failure); } // We're not in the trivial case: since we've communicated we need to // let everyone know that this round is probably not going to end well. return self.initiate_sync_failure(ctx); } /// Notifies the consensus algorithm that a branch has reached the end of /// the sync block. A final check for consistency will be performed that the /// caller has to handle. Note that pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; for mapping in &branch.channel_mapping { match mapping.expected_firing { Some(expected) => { if expected != mapping.registered_id.is_some() { // Inconsistent speculative state and actual state debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier return Consistency::Inconsistent; } }, None => {}, } } return Consistency::Valid; } /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency { debug_assert!(self.is_in_sync()); let port_desc = ctx.get_port_by_id(port_id).unwrap(); let channel_id = port_desc.channel_id; let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.channel_mapping { if mapping.channel_id == channel_id { match mapping.expected_firing { None => { // Not yet mapped, perform speculative mapping mapping.expected_firing = Some(does_fire); return Consistency::Valid; }, Some(current) => { // Already mapped if current == does_fire { return Consistency::Valid; } else { return Consistency::Inconsistent; } } } } } unreachable!("notify_of_speculative_mapping called with unowned port"); } /// Generates a new local solution from a finished branch. If the component /// is not the leader of the sync region then it will be sent to the /// appropriate component. If it is the leader then there is a chance that /// this solution completes a global solution. In that case the solution /// branch ID will be returned. pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { // Turn the port mapping into a local solution let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping; let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { // Note: if the port is silent, and we've never communicated // over the port, then we need to do so now, to let the peer // component know about our sync leader state. let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap(); let self_port_id = port_desc.self_id; let peer_port_id = port_desc.peer_id; let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&self_port_id) { let message = SyncPortMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, sync_round: self.sync_round }, source_port: self_port_id, target_port: peer_port_id, content: SyncPortContent::SilentPortNotification, }; match ctx.submit_message(Message::SyncPort(message)) { Ok(_) => { self.encountered_ports.push(self_port_id); }, Err(_) => { // Seems like we were done with this branch, but one of // the silent ports (in scope) is actually closed return self.notify_of_fatal_branch(branch_id, ctx); } } } target_mapping.push(( channel_id, port.registered_id.unwrap_or(BranchMarker::new_invalid()) )); } let local_solution = LocalSolution{ component: ctx.id, sync_round_number: self.sync_round, final_branch_id: branch_id, port_mapping: target_mapping, }; let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx); return maybe_conclusion; } /// Notifies the consensus algorithm about the chosen branch to commit to /// memory (may be the invalid "start" branch) pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); // TODO: Handle sending and receiving ports // Set final ports let branch = &self.branch_annotations[branch_id.index as usize]; // Clear out internal storage to defaults println!("DEBUG: ***** Incrementing sync round stuff"); self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); self.branch_markers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); self.handled_wave = false; self.conclusion = None; self.ack_remaining = 0; // And modify persistent storage self.sync_round += 1; for peer in self.peers.iter_mut() { peer.encountered_this_round = false; peer.expected_sync_round += 1; } println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers) } // --- Handling messages /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; let port_info = ctx.get_port_by_id(source_port_id).unwrap(); if cfg!(debug_assertions) { // Check for consistent mapping let port = branch.channel_mapping.iter() .find(|v| v.channel_id == port_info.channel_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } // Check for ports that are being sent debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle sending ports"); self.workspace_ports.clear(); } // Construct data header let data_header = DataHeader{ expected_mapping: branch.channel_mapping.iter() .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id) .copied() .collect(), sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch.cur_marker, }; // Update port mapping for mapping in &mut branch.channel_mapping { if mapping.channel_id == port_info.channel_id { mapping.expected_firing = Some(true); mapping.registered_id = Some(branch.cur_marker); } } // Update branch marker let new_marker = BranchMarker::new(self.branch_markers.len() as u32); branch.cur_marker = new_marker; self.branch_markers.push(branch_id); self.encountered_ports.push(source_port_id); return (self.create_sync_header(ctx), data_header); } /// Handles a new data message by handling the sync header. The caller is /// responsible for checking for branches that might be able to receive /// the message. pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool { let message = ctx.read_message_using_ticket(ticket).as_data(); let target_port = message.data_header.target_port; match self.handle_received_sync_header(message.sync_header, ctx) { MessageOrigin::Past => return false, MessageOrigin::Present => { self.encountered_ports.push(target_port); return true; }, MessageOrigin::Future => { let message = ctx.take_message_using_ticket(ticket); ctx.put_back_message(message); return false; } } } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { match self.handle_received_sync_header(message.sync_header, ctx) { MessageOrigin::Past => return None, MessageOrigin::Present => {}, MessageOrigin::Future => { ctx.put_back_message(Message::SyncComp(message)); return None } } // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); match &message.content { SyncCompContent::LocalFailure | SyncCompContent::LocalSolution(_) | SyncCompContent::PartialSolution(_) | SyncCompContent::AckFailure | SyncCompContent::Presence(_) => { // Needs to be handled by the leader return self.send_to_leader_or_handle_as_leader(message.content, ctx); }, SyncCompContent::GlobalSolution(solution) => { // Found a global solution debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader let (_, branch_id, _) = solution.component_branches.iter() .find(|(component_id, _, _)| *component_id == ctx.id) .unwrap(); return Some(RoundConclusion::Success(*branch_id)); }, SyncCompContent::GlobalFailure => { // Global failure of round, send Ack to leader println!("DEBUGERINO: Got GlobalFailure, sending Ack in response"); debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx); debug_assert!(_result.is_none()); return Some(RoundConclusion::Failure); }, SyncCompContent::Notification => { // We were just interested in the sync header we handled above return None; } } } pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option { match self.handle_received_sync_header(message.sync_header, ctx) { MessageOrigin::Past => return None, MessageOrigin::Present => {}, MessageOrigin::Future => { ctx.put_back_message(Message::SyncPort(message)); return None; } } debug_assert!(self.is_in_sync()); debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); match message.content { SyncPortContent::SilentPortNotification => { // The point here is to let us become part of the sync round and // take note of the leader in case all of our ports are silent. self.encountered_ports.push(message.target_port); return None } SyncPortContent::NotificationWave => { // Wave to discover everyone in the network, handling sync // header takes care of leader discovery, here we need to make // sure we propagate the wave if self.handled_wave { return None; } self.handled_wave = true; // Propagate wave to all peers except the one that has sent us // the wave. for mapping in &self.branch_annotations[0].channel_mapping { let channel_id = mapping.channel_id; let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap(); if port_desc.self_id == message.target_port { // Wave came from this port, no need to send one back continue; } let message = SyncPortMessage{ sync_header: self.create_sync_header(ctx), source_port: port_desc.self_id, target_port: port_desc.peer_id, content: SyncPortContent::NotificationWave, }; // As with the other SyncPort where we throw away the // result: we're dealing with an error here anyway let _unused = ctx.submit_message(Message::SyncPort(message)); } // And let the leader know about our port state let annotations = &self.branch_annotations[0]; let mut channels = Vec::with_capacity(annotations.channel_mapping.len()); for mapping in &annotations.channel_mapping { let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); channels.push(LocalChannelPresence{ channel_id: mapping.channel_id, is_closed: port_info.state == PortState::Closed, }); } let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ component_id: ctx.id, channels, }), ctx); return maybe_conclusion; } } } pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { if message.in_response_to_sync_round < self.sync_round { // Old message return None } // Because the message is always sent in response to a message // originating here, the sync round number can never be larger than the // currently stored one. debug_assert_eq!(message.in_response_to_sync_round, self.sync_round); match message.content { SyncControlContent::ChannelIsClosed(_) => { return self.initiate_sync_failure(ctx); } } } pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { debug_assert!(self.branch_can_receive(branch_id, message)); let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap(); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.channel_mapping { if mapping.channel_id == target_port.channel_id { // Found the port in which the message should be inserted mapping.registered_id = Some(message.data_header.new_mapping); // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(&message.content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); } return; } } // If here, then the branch didn't actually own the port? Means the // caller made a mistake unreachable!("incorrect notify_of_received_message"); } /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool { if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) { if message.sync_header.sync_round < peer.expected_sync_round { return false; } } let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches for current in &annotation.channel_mapping { if expected.channel_id == current.channel_id { if expected.registered_id != current.registered_id { // IDs do not match, we cannot receive the // message in this branch return false; } } } } return true; } // --- Internal helpers fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves let origin = self.handle_peer(&sync_header); println!(" ********************** GOT {:?}", origin); if origin != MessageOrigin::Present { // We do not have to handle it now return origin; } if sync_header.highest_component_id > self.highest_connector_id { // Sender has higher component ID. So should be the target of our // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; for peer in self.peers.iter() { if peer.id == sync_header.sending_component_id || !peer.encountered_this_round { // Don't need to send it to this one continue } let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: peer.id, content: SyncCompContent::Notification, }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // But also send our locally combined solution self.forward_local_data_to_new_leader(ctx); } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, content: SyncCompContent::Notification }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // else: exactly equal, so do nothing return MessageOrigin::Present; } /// Handles a (potentially new) peer. Returns `false` if the provided sync /// number is different then the expected one. fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin { let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); match position { Some(index) => { let entry = &mut self.peers[index]; if entry.encountered_this_round { // Already encountered this round if sync_header.sync_round < entry.expected_sync_round { return MessageOrigin::Past; } else if sync_header.sync_round == entry.expected_sync_round { return MessageOrigin::Present; } else { return MessageOrigin::Future; } } else { // TODO: Proper handling of potential overflow entry.encountered_this_round = true; if sync_header.sync_round >= entry.expected_sync_round { entry.expected_sync_round = sync_header.sync_round; return MessageOrigin::Present; } else { return MessageOrigin::Past; } } }, None => { self.peers.push(Peer{ id: sync_header.sending_component_id, encountered_this_round: true, expected_sync_round: sync_header.sync_round, }); return MessageOrigin::Present; } } } /// Sends a message towards the leader, if already the leader then the /// message will be handled immediately. fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option { if self.highest_connector_id == ctx.id { // We are the leader match content { SyncCompContent::LocalFailure => { if self.solution_combiner.mark_failure_and_check_for_global_failure() { return self.handle_global_failure_as_leader(ctx); } }, SyncCompContent::LocalSolution(local_solution) => { if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) { return self.handle_global_solution_as_leader(global_solution, ctx); } }, SyncCompContent::PartialSolution(partial_solution) => { if let Some(conclusion) = self.solution_combiner.combine(partial_solution) { match conclusion { LeaderConclusion::Solution(global_solution) => { return self.handle_global_solution_as_leader(global_solution, ctx); }, LeaderConclusion::Failure => { return self.handle_global_failure_as_leader(ctx); } } } }, SyncCompContent::Presence(component_presence) => { if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) { return self.handle_global_failure_as_leader(ctx); } }, SyncCompContent::AckFailure => { debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion); debug_assert!(self.ack_remaining > 0); self.ack_remaining -= 1; if self.ack_remaining == 0 { return Some(RoundConclusion::Failure); } } SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) | SyncCompContent::GlobalFailure => { unreachable!("unexpected message content for leader"); }, } } else { // Someone else is the leader let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content, }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } return None; } fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option { if self.conclusion.is_some() { return None; } // Handle the global solution let mut my_final_branch_id = BranchId::new_invalid(); for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() { if connector_id == ctx.id { // This is our solution branch my_final_branch_id = branch_id; continue; } // Send solution message let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncCompContent::GlobalSolution(global_solution.clone()), }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel // Update peers as leader. Subsequent call to `end_sync` will update // the round numbers match self.peers.iter_mut().find(|v| v.id == connector_id) { Some(peer) => { peer.expected_sync_round = sync_round; }, None => { self.peers.push(Peer{ id: connector_id, expected_sync_round: sync_round, encountered_this_round: true, }); } } } debug_assert!(my_final_branch_id.is_valid()); self.conclusion = Some(RoundConclusion::Success(my_final_branch_id)); return Some(RoundConclusion::Success(my_final_branch_id)); } fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure()); if self.conclusion.is_some() { // Already sent out a failure return None; } // TODO: Performance let mut encountered = VecSet::new(); for presence in &self.solution_combiner.presence { if presence.owner_a != ctx.id { // Did not add it ourselves if encountered.push(presence.owner_a) { // Not yet sent a message let message = SyncCompMessage{ sync_header: self.create_sync_header(ctx), target_component_id: presence.owner_a, content: SyncCompContent::GlobalFailure, }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } if let Some(owner_b) = presence.owner_b { if owner_b != ctx.id { if encountered.push(owner_b) { let message = SyncCompMessage{ sync_header: self.create_sync_header(ctx), target_component_id: owner_b, content: SyncCompContent::GlobalFailure, }; ctx.submit_message(Message::SyncComp(message)).unwrap(); } } } } println!("DEBUGERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.index).collect::>()); self.conclusion = Some(RoundConclusion::Failure); if encountered.is_empty() { // We don't have to wait on Acks return Some(RoundConclusion::Failure); } else { self.ack_remaining = encountered.len() as u32; return None; } } fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); // Notify leader of our channels and the fact that we just failed let channel_mapping = &self.branch_annotations[0].channel_mapping; let mut channel_presence = Vec::with_capacity(channel_mapping.len()); for mapping in channel_mapping { let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); channel_presence.push(LocalChannelPresence{ channel_id: mapping.channel_id, is_closed: port.state == PortState::Closed, }); } let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ component_id: ctx.id, channels: channel_presence, }), ctx); if self.handled_wave { // Someone (or us) has already initiated a sync failure. return maybe_already; } let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true }); println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); // Initiate a discovery wave so peers can do the same self.handled_wave = true; for mapping in &self.branch_annotations[0].channel_mapping { let channel_id = mapping.channel_id; let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); let message = SyncPortMessage{ sync_header: self.create_sync_header(ctx), source_port: port_info.self_id, target_port: port_info.peer_id, content: SyncPortContent::NotificationWave, }; // Note: submitting the message might fail. But we're attempting to // handle the error anyway. // TODO: Think about this a second time: how do we make sure the // entire network will fail if we reach this condition let _unused = ctx.submit_message(Message::SyncPort(message)); } return maybe_conclusion; } #[inline] fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, sync_round: self.sync_round, } } fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); if let Some(partial_solution) = self.solution_combiner.drain() { let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncCompContent::PartialSolution(partial_solution), }; ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } } // ----------------------------------------------------------------------------- // Solution storage and algorithms // ----------------------------------------------------------------------------- // TODO: Remove all debug derives #[derive(Debug, Clone)] struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, } #[derive(Debug, Clone)] struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } #[derive(Debug, Clone)] struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } #[derive(Debug, Clone)] struct ComponentLocalSolutions { component: ConnectorId, sync_round: u32, peers: Vec, solutions: Vec, all_peers_present: bool, } #[derive(Debug, Clone)] pub(crate) struct ComponentPresence { component_id: ConnectorId, channels: Vec, } #[derive(Debug, Clone)] pub(crate) struct LocalChannelPresence { channel_id: ChannelId, is_closed: bool, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum PresenceState { OnePresent, // one component reported the channel being open BothPresent, // two components reported the channel being open Closed, // one component reported the channel being closed } /// Record to hold channel state during the error-resolving mode of the leader. /// This is used to determine when the sync region has grown to its largest /// size. The structure is eventually consistent in the sense that a component /// might initially presume a channel is open, only to figure out later it is /// actually closed. #[derive(Debug, Clone)] struct ChannelPresence { owner_a: ConnectorId, owner_b: Option, id: ChannelId, state: PresenceState, } // TODO: Flatten? Flatten. Flatten everything. #[derive(Debug)] pub(crate) struct SolutionCombiner { local: Vec, // used for finding solution presence: Vec, // used to detect all channels present in case of failure failure_reported: bool, } struct CheckEntry { component_index: usize, // component index in combiner's vector solution_index: usize, // solution entry in the above component entry parent_entry_index: usize, // parent that caused the creation of this checking entry match_index_in_parent: usize, // index in the matches array of the parent solution_index_in_parent: usize,// index in the solution array of the match entry in the parent } enum LeaderConclusion { Solution(GlobalSolution), Failure, } impl SolutionCombiner { fn new() -> Self { return Self{ local: Vec::new(), presence: Vec::new(), failure_reported: false, }; } /// Adds a new local solution to the global solution storage. Will check the /// new local solutions for matching against already stored local solutions /// of peer connectors. fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { let component_id = solution.component; let sync_round = solution.sync_round_number; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, channel_mapping: solution.port_mapping, matches: Vec::new(), }; // Create an entry for the solution for the particular component let component_exists = self.local.iter_mut() .enumerate() .find(|(_, v)| v.component == component_id); let (component_index, solution_index, new_component) = match component_exists { Some((component_index, storage)) => { // Entry for component exists, so add to solutions let solution_index = storage.solutions.len(); storage.solutions.push(solution); (component_index, solution_index, false) } None => { // Entry for component does not exist yet let component_index = self.local.len(); self.local.push(ComponentLocalSolutions{ component: component_id, sync_round, peers: Vec::new(), solutions: vec![solution], all_peers_present: false, }); (component_index, 0, true) } }; // If this is a solution of a component that is new to us, then we check // in the stored solutions which other components are peers of the new // one. if new_component { let cur_ports = &self.local[component_index].solutions[0].channel_mapping; let mut component_peers = Vec::new(); // Find the matching components for (other_index, other_component) in self.local.iter().enumerate() { if other_index == component_index { // Don't match against ourselves continue; } let mut matching_channels = Vec::new(); for (cur_channel_id, _) in cur_ports { for (other_channel_id, _) in &other_component.solutions[0].channel_mapping { if cur_channel_id == other_channel_id { // We have a shared port matching_channels.push(*cur_channel_id); } } } if !matching_channels.is_empty() { // We share some ports component_peers.push(ComponentPeer{ target_id: other_component.component, target_index: other_index, involved_channels: matching_channels, }); } } let mut num_ports_in_peers = 0; for peer in &component_peers { num_ports_in_peers += peer.involved_channels.len(); } if num_ports_in_peers == cur_ports.len() { // Newly added component has all required peers present self.local[component_index].all_peers_present = true; } // Add the found component pairing entries to the solution entries // for the two involved components for component_match in component_peers { // Check the other component for having all peers present let mut num_ports_in_peers = component_match.involved_channels.len(); let other_component = &mut self.local[component_match.target_index]; for existing_peer in &other_component.peers { num_ports_in_peers += existing_peer.involved_channels.len(); } if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() { other_component.all_peers_present = true; } other_component.peers.push(ComponentPeer{ target_id: component_id, target_index: component_index, involved_channels: component_match.involved_channels.clone(), }); let new_component = &mut self.local[component_index]; new_component.peers.push(component_match); } } // We're now sure that we know which other components the currently // considered component is linked up to. Now we need to check those // entries (if any) to see if any pair of local solutions match let mut new_component_matches = Vec::new(); let cur_component = &self.local[component_index]; let cur_solution = &cur_component.solutions[solution_index]; for peer in &cur_component.peers { let mut new_solution_matches = Vec::new(); let other_component = &self.local[peer.target_index]; for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() { // Check the port mappings between the pair of solutions. let mut all_matched = true; 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping { for (other_port, other_branch) in &other_solution.channel_mapping { if cur_port == other_port { if cur_branch == other_branch { // Same port mapping, go to next port break; } else { // Different port mapping, not a match all_matched = false; break 'mapping_check_loop; } } } } if !all_matched { continue; } // Port mapping between the component pair is the same, so they // have agreeable local solutions new_solution_matches.push(other_solution_index); } new_component_matches.push(ComponentMatches{ target_id: peer.target_id, target_index: peer.target_index, match_indices: new_solution_matches, }); } // And now that we have the new solution-to-solution matches, we need to // add those in the appropriate storage. for new_component_match in new_component_matches { let other_component = &mut self.local[new_component_match.target_index]; for other_solution_index in new_component_match.match_indices.iter().copied() { let other_solution = &mut other_component.solutions[other_solution_index]; // Add a completely new entry for the component, or add it to // the existing component entry's matches match other_solution.matches.iter_mut() .find(|v| v.target_id == component_id) { Some(other_match) => { other_match.match_indices.push(solution_index); }, None => { other_solution.matches.push(ComponentMatches{ target_id: component_id, target_index: component_index, match_indices: vec![solution_index], }) } } } let cur_component = &mut self.local[component_index]; let cur_solution = &mut cur_component.solutions[solution_index]; match cur_solution.matches.iter_mut() .find(|v| v.target_id == new_component_match.target_id) { Some(other_match) => { // Already have an entry debug_assert_eq!(other_match.target_index, new_component_match.target_index); other_match.match_indices.extend(&new_component_match.match_indices); }, None => { // Create a new entry cur_solution.matches.push(new_component_match); } } } return self.check_for_global_solution(component_index, solution_index); } fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool { for entry in channels { let mut found = false; for existing in &mut self.presence { if existing.id == entry.channel_id { // Same entry. We only update if we have the second // component coming in it owns one end of the channel, or if // a component is telling us that the channel is (now) // closed. if entry.is_closed { existing.state = PresenceState::Closed; } else if component_id != existing.owner_a && existing.state != PresenceState::Closed { existing.state = PresenceState::BothPresent; } if existing.owner_a != component_id { existing.owner_b = Some(component_id); } found = true; break; } } if !found { self.presence.push(ChannelPresence{ owner_a: component_id, owner_b: None, id: entry.channel_id, state: if entry.is_closed { PresenceState::Closed } else { PresenceState::OnePresent }, }); } } println!("DEBUGGERINO Presence is now:\n{:#?}", self.presence); return self.check_for_global_failure(); } fn mark_failure_and_check_for_global_failure(&mut self) -> bool { self.failure_reported = true; return self.check_for_global_failure(); } /// Checks if, starting at the provided local solution, a global solution /// can be formed. // TODO: At some point, check if divide and conquer is faster? fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { // Small trivial test necessary (but not sufficient) for a global // solution for component in &self.local { if !component.all_peers_present { return None; } } // Construct initial entry on stack let mut stack = Vec::with_capacity(self.local.len()); stack.push(CheckEntry{ component_index: initial_component_index, solution_index: initial_solution_index, parent_entry_index: 0, match_index_in_parent: 0, solution_index_in_parent: 0, }); 'check_last_stack: loop { let cur_index = stack.len() - 1; let cur_entry = &stack[cur_index]; // Check if the current component is matching with all other entries let mut all_match = true; 'check_against_existing: for prev_index in 0..cur_index { let prev_entry = &stack[prev_index]; let prev_component = &self.local[prev_entry.component_index]; let prev_solution = &prev_component.solutions[prev_entry.solution_index]; for prev_matching_component in &prev_solution.matches { if prev_matching_component.target_index == cur_entry.component_index { // Previous entry has shared ports with the current // entry, so see if we have a composable pair of // solutions. if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) { all_match = false; break 'check_against_existing; } } } } if all_match { // All components matched until now. if stack.len() == self.local.len() { // We have found a global solution break 'check_last_stack; } // Not all components found yet, look for a new one that has not // yet been added yet. for (parent_index, parent_entry) in stack.iter().enumerate() { let parent_component = &self.local[parent_entry.component_index]; let parent_solution = &parent_component.solutions[parent_entry.solution_index]; for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() { if peer_component.match_indices.is_empty() { continue; } let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index); if !already_added { // New component to try stack.push(CheckEntry{ component_index: peer_component.target_index, solution_index: peer_component.match_indices[0], parent_entry_index: parent_index, match_index_in_parent: peer_index, solution_index_in_parent: 0, }); continue 'check_last_stack; } } } // Cannot find a peer to add. This is possible if, for example, // we have a component A which has the only connection to // component B. And B has sent a local solution saying it is // finished, but the last data message has not yet arrived at A. // In any case, we just exit the if statement and handle not // being able to find a new connector as being forced to try a // new permutation of possible local solutions. } // Either the currently considered local solution is inconsistent // with other local solutions, or we cannot find a new component to // add. This is where we perform backtracking as long as needed to // try a new solution. while stack.len() > 1 { // Check if our parent has another solution we can try let cur_index = stack.len() - 1; let cur_entry = &stack[cur_index]; let parent_entry = &stack[cur_entry.parent_entry_index]; let parent_component = &self.local[parent_entry.component_index]; let parent_solution = &parent_component.solutions[parent_entry.solution_index]; let match_component = &parent_solution.matches[cur_entry.match_index_in_parent]; debug_assert!(match_component.target_index == cur_entry.component_index); let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1; if new_solution_index_in_parent < match_component.match_indices.len() { // We can still try a new one let new_solution_index = match_component.match_indices[new_solution_index_in_parent]; let cur_entry = &mut stack[cur_index]; cur_entry.solution_index_in_parent = new_solution_index_in_parent; cur_entry.solution_index = new_solution_index; continue 'check_last_stack; } else { // We're out of options here. So pop an entry, then in // the next iteration of this backtracking loop we try // to increment that solution stack.pop(); } } // Stack length is 1, hence we're back at our initial solution. // Since that doesn't yield a global solution, we simply: return None; } // Constructing the representation of the global solution debug_assert_eq!(stack.len(), self.local.len()); let mut final_branches = Vec::with_capacity(stack.len()); for entry in &stack { let component = &self.local[entry.component_index]; let solution = &component.solutions[entry.solution_index]; final_branches.push((component.component, solution.final_branch_id, component.sync_round)); } // Just debugging here, TODO: @remove let mut total_num_channels = 0; for entry in &stack { let component = &self.local[entry.component_index]; total_num_channels += component.solutions[0].channel_mapping.len(); } total_num_channels /= 2; let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; for entry in &stack { let component = &self.local[entry.component_index]; let solution = &component.solutions[entry.solution_index]; for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { debug_assert_eq!(*encountered_branch_id, branch_id); total_num_checked += 1; }, None => { final_mapping.push((channel_id, branch_id)); } } } } debug_assert_eq!(total_num_checked, total_num_channels); return Some(GlobalSolution{ component_branches: final_branches, channel_mapping: final_mapping, }); } /// Checks if all preconditions for global sync failure have been met fn check_for_global_failure(&self) -> bool { if !self.failure_reported { return false; } // Failure is reported, if all components are present then we may emit // the global failure broadcast // Check if all are present and we're preparing to fail this round let mut all_present = true; for presence in &self.presence { if presence.state == PresenceState::OnePresent { all_present = false; break; } } return all_present; // && failure_reported, which is checked above } /// Turns the entire (partially resolved) global solution into a structure /// that can be forwarded to a new parent. The new parent may then merge /// already obtained information. fn drain(&mut self) -> Option { if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported { return None; } let result = SolutionCombiner{ local: self.local.clone(), presence: self.presence.clone(), failure_reported: self.failure_reported, }; self.local.clear(); self.presence.clear(); self.failure_reported = false; return Some(result); } // TODO: Entire routine is quite wasteful. Combine instead of doing all work // again. fn combine(&mut self, combiner: SolutionCombiner) -> Option { self.failure_reported = self.failure_reported || combiner.failure_reported; // Handle local solutions if self.local.is_empty() { // Trivial case self.local = combiner.local; } else { for local in combiner.local { for matched in local.solutions { let local_solution = LocalSolution{ component: local.component, sync_round_number: local.sync_round, final_branch_id: matched.final_branch_id, port_mapping: matched.channel_mapping, }; let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution); if let Some(global_solution) = maybe_solution { return Some(LeaderConclusion::Solution(global_solution)); } } } } // Handle channel presence println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence); if self.presence.is_empty() { // Trivial case self.presence = combiner.presence; println!("DEBUGERINO: Trivial merging") } else { for presence in combiner.presence { match self.presence.iter_mut().find(|v| v.id == presence.id) { Some(entry) => { // Combine entries. Take first that has Closed, then // check first that has both, then check if they are // combinable if entry.state == PresenceState::Closed { // Do nothing } else if presence.state == PresenceState::Closed { entry.owner_a = presence.owner_a; entry.owner_b = presence.owner_b; entry.state = PresenceState::Closed; } else if entry.state == PresenceState::BothPresent { // Again: do nothing } else if presence.state == PresenceState::BothPresent { entry.owner_a = presence.owner_a; entry.owner_b = presence.owner_b; entry.state = PresenceState::BothPresent; } else { // Both have one presence, combine into both present debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent); entry.owner_b = Some(presence.owner_a); entry.state = PresenceState::BothPresent; } }, None => { self.presence.push(presence); } } } println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence); // After adding everything we might have immediately found a solution if self.check_for_global_failure() { println!("DEBUG: Returning immediate failure?"); return Some(LeaderConclusion::Failure); } } return None; } fn clear(&mut self) { self.local.clear(); self.presence.clear(); self.failure_reported = false; } } // ----------------------------------------------------------------------------- // Generic Helpers // ----------------------------------------------------------------------------- /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. use crate::protocol::eval::Value; fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortIdLocal::new(port_id.id); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } }