diff --git a/src/runtime/consensus.rs b/src/runtime/consensus.rs new file mode 100644 index 0000000000000000000000000000000000000000..ddb3e086df412d7b43532c04f2cd6d41d2e4b22b --- /dev/null +++ b/src/runtime/consensus.rs @@ -0,0 +1,1583 @@ +use crate::collections::VecSet; + +use crate::protocol::eval::ValueGroup; + +use super::ConnectorId; +use super::branch::BranchId; +use super::port::{ChannelId, PortIdLocal, PortState}; +use super::inbox::{ + Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, + DataMessage, + SyncCompMessage, SyncCompContent, + SyncPortMessage, SyncPortContent, + SyncControlMessage, SyncControlContent +}; +use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket}; + +struct BranchAnnotation { + channel_mapping: Vec, + cur_marker: BranchMarker, +} + +#[derive(Debug)] +pub(crate) struct LocalSolution { + component: ConnectorId, + final_branch_id: BranchId, + sync_round_number: u32, + port_mapping: Vec<(ChannelId, BranchMarker)>, +} + +#[derive(Debug, Clone)] +pub(crate) struct GlobalSolution { + component_branches: Vec<(ConnectorId, BranchId, u32)>, + channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info +} + +#[derive(Debug, PartialEq, Eq)] +pub enum RoundConclusion { + Failure, + Success(BranchId), +} + +// ----------------------------------------------------------------------------- +// Consensus +// ----------------------------------------------------------------------------- + +#[derive(Debug)] +struct Peer { + id: ConnectorId, + encountered_this_round: bool, + expected_sync_round: u32, +} + +/// The consensus algorithm. Currently only implemented to find the component +/// with the highest ID within the sync region and letting it handle all the +/// local solutions. +/// +/// The type itself serves as an experiment to see how code should be organized. +// TODO: Flatten all datastructures +// TODO: Have a "branch+port position hint" in case multiple operations are +// performed on the same port to prevent repeated lookups +// TODO: A lot of stuff should be batched. Like checking all the sync headers +// and sending "I have a higher ID" messages. Should reduce locking by quite a +// bit. +// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID +// but we do want to enumerate all current ports. So put that somewhere in a +// central place. Secondly. Error handling and regular message handling is +// becoming a mess. +pub(crate) struct Consensus { + // --- State that is cleared after each round + // Local component's state + highest_connector_id: ConnectorId, + branch_annotations: Vec, // index is branch ID + branch_markers: Vec, // index is branch marker, maps to branch + // Gathered state from communication + encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. + solution_combiner: SolutionCombiner, + handled_wave: bool, // encountered notification wave in this round + conclusion: Option, + ack_remaining: u32, + // --- Persistent state + peers: Vec, + sync_round: u32, + // --- Workspaces + workspace_ports: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Consistency { + Valid, + Inconsistent, +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum MessageOrigin { + Past, + Present, + Future +} + +impl Consensus { + pub fn new() -> Self { + return Self { + highest_connector_id: ConnectorId::new_invalid(), + branch_annotations: Vec::new(), + branch_markers: Vec::new(), + encountered_ports: VecSet::new(), + solution_combiner: SolutionCombiner::new(), + handled_wave: false, + conclusion: None, + ack_remaining: 0, + peers: Vec::new(), + sync_round: 0, + workspace_ports: Vec::new(), + } + } + + // --- Controlling sync round and branches + + /// Returns whether the consensus algorithm is running in sync mode + pub fn is_in_sync(&self) -> bool { + return !self.branch_annotations.is_empty(); + } + + #[deprecated] + pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation { + let branch = &self.branch_annotations[branch_id.index as usize]; + let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap(); + return port; + } + + /// Sets up the consensus algorithm for a new synchronous round. The + /// provided ports should be the ports the component owns at the start of + /// the sync round. + pub fn start_sync(&mut self, ctx: &ComponentCtx) { + debug_assert!(!self.highest_connector_id.is_valid()); + debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(self.solution_combiner.local.is_empty()); + + // We'll use the first "branch" (the non-sync one) to store our ports, + // this allows cloning if we created a new branch. + self.branch_annotations.push(BranchAnnotation{ + channel_mapping: ctx.get_ports().iter() + .map(|v| ChannelAnnotation { + channel_id: v.channel_id, + registered_id: None, + expected_firing: None, + }) + .collect(), + cur_marker: BranchMarker::new_invalid(), + }); + self.branch_markers.push(BranchId::new_invalid()); + + self.highest_connector_id = ctx.id; + + } + + /// Notifies the consensus algorithm that a new branch has appeared. Must be + /// called for each forked branch in the execution tree. + pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { + // If called correctly. Then each time we are notified the new branch's + // index is the length in `branch_annotations`. + debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); + let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); + let new_branch_annotations = BranchAnnotation{ + channel_mapping: parent_branch_annotations.channel_mapping.clone(), + cur_marker: new_marker, + }; + self.branch_annotations.push(new_branch_annotations); + self.branch_markers.push(new_branch_id); + } + + /// Notifies the consensus algorithm that a particular branch has + /// encountered an unrecoverable error. + pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.is_in_sync()); + + // Check for trivial case, where branch has not yet communicated within + // the consensus algorithm + let branch = &self.branch_annotations[failed_branch_id.index as usize]; + if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) { + println!("DEBUG: Failure everything silent"); + return Some(RoundConclusion::Failure); + } + + // We're not in the trivial case: since we've communicated we need to + // let everyone know that this round is probably not going to end well. + return self.initiate_sync_failure(ctx); + } + + /// Notifies the consensus algorithm that a branch has reached the end of + /// the sync block. A final check for consistency will be performed that the + /// caller has to handle. Note that + pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { + debug_assert!(self.is_in_sync()); + let branch = &self.branch_annotations[branch_id.index as usize]; + for mapping in &branch.channel_mapping { + match mapping.expected_firing { + Some(expected) => { + if expected != mapping.registered_id.is_some() { + // Inconsistent speculative state and actual state + debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier + return Consistency::Inconsistent; + } + }, + None => {}, + } + } + + return Consistency::Valid; + } + + /// Notifies the consensus algorithm that a particular branch has assumed + /// a speculative value for its port mapping. + pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency { + debug_assert!(self.is_in_sync()); + + let port_desc = ctx.get_port_by_id(port_id).unwrap(); + let channel_id = port_desc.channel_id; + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == channel_id { + match mapping.expected_firing { + None => { + // Not yet mapped, perform speculative mapping + mapping.expected_firing = Some(does_fire); + return Consistency::Valid; + }, + Some(current) => { + // Already mapped + if current == does_fire { + return Consistency::Valid; + } else { + return Consistency::Inconsistent; + } + } + } + } + } + + unreachable!("notify_of_speculative_mapping called with unowned port"); + } + + /// Generates a new local solution from a finished branch. If the component + /// is not the leader of the sync region then it will be sent to the + /// appropriate component. If it is the leader then there is a chance that + /// this solution completes a global solution. In that case the solution + /// branch ID will be returned. + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. + let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap(); + let self_port_id = port_desc.self_id; + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&self_port_id) { + let message = SyncPortMessage { + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + sync_round: self.sync_round + }, + source_port: self_port_id, + target_port: peer_port_id, + content: SyncPortContent::SilentPortNotification, + }; + match ctx.submit_message(Message::SyncPort(message)) { + Ok(_) => { + self.encountered_ports.push(self_port_id); + }, + Err(_) => { + // Seems like we were done with this branch, but one of + // the silent ports (in scope) is actually closed + return self.notify_of_fatal_branch(branch_id, ctx); + } + } + } + + target_mapping.push(( + channel_id, + port.registered_id.unwrap_or(BranchMarker::new_invalid()) + )); + } + + let local_solution = LocalSolution{ + component: ctx.id, + sync_round_number: self.sync_round, + final_branch_id: branch_id, + port_mapping: target_mapping, + }; + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx); + return maybe_conclusion; + } + + /// Notifies the consensus algorithm about the chosen branch to commit to + /// memory (may be the invalid "start" branch) + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { + debug_assert!(self.is_in_sync()); + + // TODO: Handle sending and receiving ports + // Set final ports + let branch = &self.branch_annotations[branch_id.index as usize]; + + // Clear out internal storage to defaults + println!("DEBUG: ***** Incrementing sync round stuff"); + self.highest_connector_id = ConnectorId::new_invalid(); + self.branch_annotations.clear(); + self.branch_markers.clear(); + self.encountered_ports.clear(); + self.solution_combiner.clear(); + self.handled_wave = false; + self.conclusion = None; + self.ack_remaining = 0; + + // And modify persistent storage + self.sync_round += 1; + + for peer in self.peers.iter_mut() { + peer.encountered_this_round = false; + peer.expected_sync_round += 1; + } + + println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers) + } + + // --- Handling messages + + /// Prepares a message for sending. Caller should have made sure that + /// sending the message is consistent with the speculative state. + pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { + debug_assert!(self.is_in_sync()); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + let port_info = ctx.get_port_by_id(source_port_id).unwrap(); + + if cfg!(debug_assertions) { + // Check for consistent mapping + let port = branch.channel_mapping.iter() + .find(|v| v.channel_id == port_info.channel_id) + .unwrap(); + debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); + } + + // Check for ports that are being sent + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle sending ports"); + self.workspace_ports.clear(); + } + + // Construct data header + let data_header = DataHeader{ + expected_mapping: branch.channel_mapping.iter() + .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id) + .copied() + .collect(), + sending_port: port_info.self_id, + target_port: port_info.peer_id, + new_mapping: branch.cur_marker, + }; + + // Update port mapping + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == port_info.channel_id { + mapping.expected_firing = Some(true); + mapping.registered_id = Some(branch.cur_marker); + } + } + + // Update branch marker + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); + branch.cur_marker = new_marker; + self.branch_markers.push(branch_id); + + self.encountered_ports.push(source_port_id); + + return (self.create_sync_header(ctx), data_header); + } + + /// Handles a new data message by handling the sync header. The caller is + /// responsible for checking for branches that might be able to receive + /// the message. + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool { + let message = ctx.read_message_using_ticket(ticket).as_data(); + let target_port = message.data_header.target_port; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return false, + MessageOrigin::Present => { + self.encountered_ports.push(target_port); + return true; + }, + MessageOrigin::Future => { + let message = ctx.take_message_using_ticket(ticket); + ctx.put_back_message(message); + return false; + } + } + } + + /// Handles a new sync message by handling the sync header and the contents + /// of the message. Returns `Some` with the branch ID of the global solution + /// if the sync solution has been found. + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncComp(message)); + return None + } + } + + // And handle the contents + debug_assert_eq!(message.target_component_id, ctx.id); + + match &message.content { + SyncCompContent::LocalFailure | + SyncCompContent::LocalSolution(_) | + SyncCompContent::PartialSolution(_) | + SyncCompContent::AckFailure | + SyncCompContent::Presence(_) => { + // Needs to be handled by the leader + return self.send_to_leader_or_handle_as_leader(message.content, ctx); + }, + SyncCompContent::GlobalSolution(solution) => { + // Found a global solution + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader + let (_, branch_id, _) = solution.component_branches.iter() + .find(|(component_id, _, _)| *component_id == ctx.id) + .unwrap(); + return Some(RoundConclusion::Success(*branch_id)); + }, + SyncCompContent::GlobalFailure => { + // Global failure of round, send Ack to leader + println!("DEBUGERINO: Got GlobalFailure, sending Ack in response"); + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader + let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx); + debug_assert!(_result.is_none()); + return Some(RoundConclusion::Failure); + }, + SyncCompContent::Notification => { + // We were just interested in the sync header we handled above + return None; + } + } + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option { + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncPort(message)); + return None; + } + } + + debug_assert!(self.is_in_sync()); + debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); + match message.content { + SyncPortContent::SilentPortNotification => { + // The point here is to let us become part of the sync round and + // take note of the leader in case all of our ports are silent. + self.encountered_ports.push(message.target_port); + return None + } + SyncPortContent::NotificationWave => { + // Wave to discover everyone in the network, handling sync + // header takes care of leader discovery, here we need to make + // sure we propagate the wave + if self.handled_wave { + return None; + } + + self.handled_wave = true; + + // Propagate wave to all peers except the one that has sent us + // the wave. + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap(); + if port_desc.self_id == message.target_port { + // Wave came from this port, no need to send one back + continue; + } + + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_desc.self_id, + target_port: port_desc.peer_id, + content: SyncPortContent::NotificationWave, + }; + // As with the other SyncPort where we throw away the + // result: we're dealing with an error here anyway + let _unused = ctx.submit_message(Message::SyncPort(message)); + } + + // And let the leader know about our port state + let annotations = &self.branch_annotations[0]; + let mut channels = Vec::with_capacity(annotations.channel_mapping.len()); + for mapping in &annotations.channel_mapping { + let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channels.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port_info.state == PortState::Closed, + }); + } + + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels, + }), ctx); + return maybe_conclusion; + } + } + } + + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if message.in_response_to_sync_round < self.sync_round { + // Old message + return None + } + + // Because the message is always sent in response to a message + // originating here, the sync round number can never be larger than the + // currently stored one. + debug_assert_eq!(message.in_response_to_sync_round, self.sync_round); + match message.content { + SyncControlContent::ChannelIsClosed(_) => { + return self.initiate_sync_failure(ctx); + } + } + } + + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { + debug_assert!(self.branch_can_receive(branch_id, message)); + + let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap(); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == target_port.channel_id { + // Found the port in which the message should be inserted + mapping.registered_id = Some(message.data_header.new_mapping); + + // Check for sent ports + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(&message.content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle received ports"); + self.workspace_ports.clear(); + } + + return; + } + } + + // If here, then the branch didn't actually own the port? Means the + // caller made a mistake + unreachable!("incorrect notify_of_received_message"); + } + + /// Matches the mapping between the branch and the data message. If they + /// match then the branch can receive the message. + pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) { + if message.sync_header.sync_round < peer.expected_sync_round { + return false; + } + } + + let annotation = &self.branch_annotations[branch_id.index as usize]; + for expected in &message.data_header.expected_mapping { + // If we own the port, then we have an entry in the + // annotation, check if the current mapping matches + for current in &annotation.channel_mapping { + if expected.channel_id == current.channel_id { + if expected.registered_id != current.registered_id { + // IDs do not match, we cannot receive the + // message in this branch + return false; + } + } + } + } + + return true; + } + + // --- Internal helpers + + fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin { + debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves + let origin = self.handle_peer(&sync_header); + println!(" ********************** GOT {:?}", origin); + if origin != MessageOrigin::Present { + // We do not have to handle it now + return origin; + } + + if sync_header.highest_component_id > self.highest_connector_id { + // Sender has higher component ID. So should be the target of our + // messages. We should also let all of our peers know + self.highest_connector_id = sync_header.highest_component_id; + for peer in self.peers.iter() { + if peer.id == sync_header.sending_component_id || !peer.encountered_this_round { + // Don't need to send it to this one + continue + } + + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: peer.id, + content: SyncCompContent::Notification, + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + } + + // But also send our locally combined solution + self.forward_local_data_to_new_leader(ctx); + } else if sync_header.highest_component_id < self.highest_connector_id { + // Sender has lower leader ID, so it should know about our higher + // one. + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: sync_header.sending_component_id, + content: SyncCompContent::Notification + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + } // else: exactly equal, so do nothing + + return MessageOrigin::Present; + } + + /// Handles a (potentially new) peer. Returns `false` if the provided sync + /// number is different then the expected one. + fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin { + let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); + match position { + Some(index) => { + let entry = &mut self.peers[index]; + if entry.encountered_this_round { + // Already encountered this round + if sync_header.sync_round < entry.expected_sync_round { + return MessageOrigin::Past; + } else if sync_header.sync_round == entry.expected_sync_round { + return MessageOrigin::Present; + } else { + return MessageOrigin::Future; + } + } else { + // TODO: Proper handling of potential overflow + entry.encountered_this_round = true; + + if sync_header.sync_round >= entry.expected_sync_round { + entry.expected_sync_round = sync_header.sync_round; + return MessageOrigin::Present; + } else { + return MessageOrigin::Past; + } + } + }, + None => { + self.peers.push(Peer{ + id: sync_header.sending_component_id, + encountered_this_round: true, + expected_sync_round: sync_header.sync_round, + }); + return MessageOrigin::Present; + } + } + } + + /// Sends a message towards the leader, if already the leader then the + /// message will be handled immediately. + fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option { + if self.highest_connector_id == ctx.id { + // We are the leader + match content { + SyncCompContent::LocalFailure => { + if self.solution_combiner.mark_failure_and_check_for_global_failure() { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::LocalSolution(local_solution) => { + if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) { + return self.handle_global_solution_as_leader(global_solution, ctx); + } + }, + SyncCompContent::PartialSolution(partial_solution) => { + if let Some(conclusion) = self.solution_combiner.combine(partial_solution) { + match conclusion { + LeaderConclusion::Solution(global_solution) => { + return self.handle_global_solution_as_leader(global_solution, ctx); + }, + LeaderConclusion::Failure => { + return self.handle_global_failure_as_leader(ctx); + } + } + } + }, + SyncCompContent::Presence(component_presence) => { + if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::AckFailure => { + debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion); + debug_assert!(self.ack_remaining > 0); + self.ack_remaining -= 1; + if self.ack_remaining == 0 { + return Some(RoundConclusion::Failure); + } + } + SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) | + SyncCompContent::GlobalFailure => { + unreachable!("unexpected message content for leader"); + }, + } + } else { + // Someone else is the leader + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content, + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + } + + return None; + } + + fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option { + if self.conclusion.is_some() { + return None; + } + + // Handle the global solution + let mut my_final_branch_id = BranchId::new_invalid(); + for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() { + if connector_id == ctx.id { + // This is our solution branch + my_final_branch_id = branch_id; + continue; + } + + // Send solution message + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: connector_id, + content: SyncCompContent::GlobalSolution(global_solution.clone()), + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + + // Update peers as leader. Subsequent call to `end_sync` will update + // the round numbers + match self.peers.iter_mut().find(|v| v.id == connector_id) { + Some(peer) => { + peer.expected_sync_round = sync_round; + }, + None => { + self.peers.push(Peer{ + id: connector_id, + expected_sync_round: sync_round, + encountered_this_round: true, + }); + } + } + } + + debug_assert!(my_final_branch_id.is_valid()); + self.conclusion = Some(RoundConclusion::Success(my_final_branch_id)); + return Some(RoundConclusion::Success(my_final_branch_id)); + } + + fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure()); + if self.conclusion.is_some() { + // Already sent out a failure + return None; + } + + // TODO: Performance + let mut encountered = VecSet::new(); + for presence in &self.solution_combiner.presence { + if presence.owner_a != ctx.id { + // Did not add it ourselves + if encountered.push(presence.owner_a) { + // Not yet sent a message + let message = SyncCompMessage{ + sync_header: self.create_sync_header(ctx), + target_component_id: presence.owner_a, + content: SyncCompContent::GlobalFailure, + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + } + } + + if let Some(owner_b) = presence.owner_b { + if owner_b != ctx.id { + if encountered.push(owner_b) { + let message = SyncCompMessage{ + sync_header: self.create_sync_header(ctx), + target_component_id: owner_b, + content: SyncCompContent::GlobalFailure, + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); + } + } + } + } + + println!("DEBUGERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.index).collect::>()); + self.conclusion = Some(RoundConclusion::Failure); + if encountered.is_empty() { + // We don't have to wait on Acks + return Some(RoundConclusion::Failure); + } else { + self.ack_remaining = encountered.len() as u32; + return None; + } + } + + fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.is_in_sync()); + + // Notify leader of our channels and the fact that we just failed + let channel_mapping = &self.branch_annotations[0].channel_mapping; + let mut channel_presence = Vec::with_capacity(channel_mapping.len()); + for mapping in channel_mapping { + let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap(); + channel_presence.push(LocalChannelPresence{ + channel_id: mapping.channel_id, + is_closed: port.state == PortState::Closed, + }); + } + let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + component_id: ctx.id, + channels: channel_presence, + }), ctx); + + if self.handled_wave { + // Someone (or us) has already initiated a sync failure. + return maybe_already; + } + + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true }); + println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion); + + // Initiate a discovery wave so peers can do the same + self.handled_wave = true; + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_info.self_id, + target_port: port_info.peer_id, + content: SyncPortContent::NotificationWave, + }; + + // Note: submitting the message might fail. But we're attempting to + // handle the error anyway. + // TODO: Think about this a second time: how do we make sure the + // entire network will fail if we reach this condition + let _unused = ctx.submit_message(Message::SyncPort(message)); + } + + return maybe_conclusion; + } + + #[inline] + fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { + return SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + sync_round: self.sync_round, + } + } + + fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) { + debug_assert_ne!(self.highest_connector_id, ctx.id); + + if let Some(partial_solution) = self.solution_combiner.drain() { + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncCompContent::PartialSolution(partial_solution), + }; + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel + } + } +} + +// ----------------------------------------------------------------------------- +// Solution storage and algorithms +// ----------------------------------------------------------------------------- + +// TODO: Remove all debug derives + +#[derive(Debug, Clone)] +struct MatchedLocalSolution { + final_branch_id: BranchId, + channel_mapping: Vec<(ChannelId, BranchMarker)>, + matches: Vec, +} + +#[derive(Debug, Clone)] +struct ComponentMatches { + target_id: ConnectorId, + target_index: usize, + match_indices: Vec, // of local solution in connector +} + +#[derive(Debug, Clone)] +struct ComponentPeer { + target_id: ConnectorId, + target_index: usize, // in array of global solution components + involved_channels: Vec, +} + +#[derive(Debug, Clone)] +struct ComponentLocalSolutions { + component: ConnectorId, + sync_round: u32, + peers: Vec, + solutions: Vec, + all_peers_present: bool, +} + +#[derive(Debug, Clone)] +pub(crate) struct ComponentPresence { + component_id: ConnectorId, + channels: Vec, +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalChannelPresence { + channel_id: ChannelId, + is_closed: bool, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PresenceState { + OnePresent, // one component reported the channel being open + BothPresent, // two components reported the channel being open + Closed, // one component reported the channel being closed +} + +/// Record to hold channel state during the error-resolving mode of the leader. +/// This is used to determine when the sync region has grown to its largest +/// size. The structure is eventually consistent in the sense that a component +/// might initially presume a channel is open, only to figure out later it is +/// actually closed. +#[derive(Debug, Clone)] +struct ChannelPresence { + owner_a: ConnectorId, + owner_b: Option, + id: ChannelId, + state: PresenceState, +} + +// TODO: Flatten? Flatten. Flatten everything. +#[derive(Debug)] +pub(crate) struct SolutionCombiner { + local: Vec, // used for finding solution + presence: Vec, // used to detect all channels present in case of failure + failure_reported: bool, +} + +struct CheckEntry { + component_index: usize, // component index in combiner's vector + solution_index: usize, // solution entry in the above component entry + parent_entry_index: usize, // parent that caused the creation of this checking entry + match_index_in_parent: usize, // index in the matches array of the parent + solution_index_in_parent: usize,// index in the solution array of the match entry in the parent +} + +enum LeaderConclusion { + Solution(GlobalSolution), + Failure, +} + +impl SolutionCombiner { + fn new() -> Self { + return Self{ + local: Vec::new(), + presence: Vec::new(), + failure_reported: false, + }; + } + + /// Adds a new local solution to the global solution storage. Will check the + /// new local solutions for matching against already stored local solutions + /// of peer connectors. + fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option { + let component_id = solution.component; + let sync_round = solution.sync_round_number; + let solution = MatchedLocalSolution{ + final_branch_id: solution.final_branch_id, + channel_mapping: solution.port_mapping, + matches: Vec::new(), + }; + + // Create an entry for the solution for the particular component + let component_exists = self.local.iter_mut() + .enumerate() + .find(|(_, v)| v.component == component_id); + let (component_index, solution_index, new_component) = match component_exists { + Some((component_index, storage)) => { + // Entry for component exists, so add to solutions + let solution_index = storage.solutions.len(); + storage.solutions.push(solution); + + (component_index, solution_index, false) + } + None => { + // Entry for component does not exist yet + let component_index = self.local.len(); + self.local.push(ComponentLocalSolutions{ + component: component_id, + sync_round, + peers: Vec::new(), + solutions: vec![solution], + all_peers_present: false, + }); + (component_index, 0, true) + } + }; + + // If this is a solution of a component that is new to us, then we check + // in the stored solutions which other components are peers of the new + // one. + if new_component { + let cur_ports = &self.local[component_index].solutions[0].channel_mapping; + let mut component_peers = Vec::new(); + + // Find the matching components + for (other_index, other_component) in self.local.iter().enumerate() { + if other_index == component_index { + // Don't match against ourselves + continue; + } + + let mut matching_channels = Vec::new(); + for (cur_channel_id, _) in cur_ports { + for (other_channel_id, _) in &other_component.solutions[0].channel_mapping { + if cur_channel_id == other_channel_id { + // We have a shared port + matching_channels.push(*cur_channel_id); + } + } + } + + if !matching_channels.is_empty() { + // We share some ports + component_peers.push(ComponentPeer{ + target_id: other_component.component, + target_index: other_index, + involved_channels: matching_channels, + }); + } + } + + let mut num_ports_in_peers = 0; + for peer in &component_peers { + num_ports_in_peers += peer.involved_channels.len(); + } + + if num_ports_in_peers == cur_ports.len() { + // Newly added component has all required peers present + self.local[component_index].all_peers_present = true; + } + + // Add the found component pairing entries to the solution entries + // for the two involved components + for component_match in component_peers { + // Check the other component for having all peers present + let mut num_ports_in_peers = component_match.involved_channels.len(); + let other_component = &mut self.local[component_match.target_index]; + for existing_peer in &other_component.peers { + num_ports_in_peers += existing_peer.involved_channels.len(); + } + + if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() { + other_component.all_peers_present = true; + } + + other_component.peers.push(ComponentPeer{ + target_id: component_id, + target_index: component_index, + involved_channels: component_match.involved_channels.clone(), + }); + + let new_component = &mut self.local[component_index]; + new_component.peers.push(component_match); + } + } + + // We're now sure that we know which other components the currently + // considered component is linked up to. Now we need to check those + // entries (if any) to see if any pair of local solutions match + let mut new_component_matches = Vec::new(); + let cur_component = &self.local[component_index]; + let cur_solution = &cur_component.solutions[solution_index]; + + for peer in &cur_component.peers { + let mut new_solution_matches = Vec::new(); + + let other_component = &self.local[peer.target_index]; + for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() { + // Check the port mappings between the pair of solutions. + let mut all_matched = true; + + 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping { + for (other_port, other_branch) in &other_solution.channel_mapping { + if cur_port == other_port { + if cur_branch == other_branch { + // Same port mapping, go to next port + break; + } else { + // Different port mapping, not a match + all_matched = false; + break 'mapping_check_loop; + } + } + } + } + + if !all_matched { + continue; + } + + // Port mapping between the component pair is the same, so they + // have agreeable local solutions + new_solution_matches.push(other_solution_index); + } + + new_component_matches.push(ComponentMatches{ + target_id: peer.target_id, + target_index: peer.target_index, + match_indices: new_solution_matches, + }); + } + + // And now that we have the new solution-to-solution matches, we need to + // add those in the appropriate storage. + for new_component_match in new_component_matches { + let other_component = &mut self.local[new_component_match.target_index]; + + for other_solution_index in new_component_match.match_indices.iter().copied() { + let other_solution = &mut other_component.solutions[other_solution_index]; + + // Add a completely new entry for the component, or add it to + // the existing component entry's matches + match other_solution.matches.iter_mut() + .find(|v| v.target_id == component_id) + { + Some(other_match) => { + other_match.match_indices.push(solution_index); + }, + None => { + other_solution.matches.push(ComponentMatches{ + target_id: component_id, + target_index: component_index, + match_indices: vec![solution_index], + }) + } + } + } + + let cur_component = &mut self.local[component_index]; + let cur_solution = &mut cur_component.solutions[solution_index]; + + match cur_solution.matches.iter_mut() + .find(|v| v.target_id == new_component_match.target_id) + { + Some(other_match) => { + // Already have an entry + debug_assert_eq!(other_match.target_index, new_component_match.target_index); + other_match.match_indices.extend(&new_component_match.match_indices); + }, + None => { + // Create a new entry + cur_solution.matches.push(new_component_match); + } + } + } + + return self.check_for_global_solution(component_index, solution_index); + } + + fn add_presence_and_check_for_global_failure(&mut self, component_id: ConnectorId, channels: &[LocalChannelPresence]) -> bool { + for entry in channels { + let mut found = false; + + for existing in &mut self.presence { + if existing.id == entry.channel_id { + // Same entry. We only update if we have the second + // component coming in it owns one end of the channel, or if + // a component is telling us that the channel is (now) + // closed. + if entry.is_closed { + existing.state = PresenceState::Closed; + } else if component_id != existing.owner_a && existing.state != PresenceState::Closed { + existing.state = PresenceState::BothPresent; + } + + if existing.owner_a != component_id { + existing.owner_b = Some(component_id); + } + + found = true; + break; + } + } + + if !found { + self.presence.push(ChannelPresence{ + owner_a: component_id, + owner_b: None, + id: entry.channel_id, + state: if entry.is_closed { PresenceState::Closed } else { PresenceState::OnePresent }, + }); + } + } + + println!("DEBUGGERINO Presence is now:\n{:#?}", self.presence); + + return self.check_for_global_failure(); + } + + fn mark_failure_and_check_for_global_failure(&mut self) -> bool { + self.failure_reported = true; + return self.check_for_global_failure(); + } + + /// Checks if, starting at the provided local solution, a global solution + /// can be formed. + // TODO: At some point, check if divide and conquer is faster? + fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { + // Small trivial test necessary (but not sufficient) for a global + // solution + for component in &self.local { + if !component.all_peers_present { + return None; + } + } + + // Construct initial entry on stack + let mut stack = Vec::with_capacity(self.local.len()); + stack.push(CheckEntry{ + component_index: initial_component_index, + solution_index: initial_solution_index, + parent_entry_index: 0, + match_index_in_parent: 0, + solution_index_in_parent: 0, + }); + + 'check_last_stack: loop { + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + // Check if the current component is matching with all other entries + let mut all_match = true; + 'check_against_existing: for prev_index in 0..cur_index { + let prev_entry = &stack[prev_index]; + let prev_component = &self.local[prev_entry.component_index]; + let prev_solution = &prev_component.solutions[prev_entry.solution_index]; + + for prev_matching_component in &prev_solution.matches { + if prev_matching_component.target_index == cur_entry.component_index { + // Previous entry has shared ports with the current + // entry, so see if we have a composable pair of + // solutions. + if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) { + all_match = false; + break 'check_against_existing; + } + } + } + } + + if all_match { + // All components matched until now. + if stack.len() == self.local.len() { + // We have found a global solution + break 'check_last_stack; + } + + // Not all components found yet, look for a new one that has not + // yet been added yet. + for (parent_index, parent_entry) in stack.iter().enumerate() { + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; + + for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() { + if peer_component.match_indices.is_empty() { + continue; + } + + let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index); + if !already_added { + // New component to try + stack.push(CheckEntry{ + component_index: peer_component.target_index, + solution_index: peer_component.match_indices[0], + parent_entry_index: parent_index, + match_index_in_parent: peer_index, + solution_index_in_parent: 0, + }); + continue 'check_last_stack; + } + } + } + + // Cannot find a peer to add. This is possible if, for example, + // we have a component A which has the only connection to + // component B. And B has sent a local solution saying it is + // finished, but the last data message has not yet arrived at A. + + // In any case, we just exit the if statement and handle not + // being able to find a new connector as being forced to try a + // new permutation of possible local solutions. + } + + // Either the currently considered local solution is inconsistent + // with other local solutions, or we cannot find a new component to + // add. This is where we perform backtracking as long as needed to + // try a new solution. + while stack.len() > 1 { + // Check if our parent has another solution we can try + let cur_index = stack.len() - 1; + let cur_entry = &stack[cur_index]; + + let parent_entry = &stack[cur_entry.parent_entry_index]; + let parent_component = &self.local[parent_entry.component_index]; + let parent_solution = &parent_component.solutions[parent_entry.solution_index]; + + let match_component = &parent_solution.matches[cur_entry.match_index_in_parent]; + debug_assert!(match_component.target_index == cur_entry.component_index); + let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1; + + if new_solution_index_in_parent < match_component.match_indices.len() { + // We can still try a new one + let new_solution_index = match_component.match_indices[new_solution_index_in_parent]; + let cur_entry = &mut stack[cur_index]; + cur_entry.solution_index_in_parent = new_solution_index_in_parent; + cur_entry.solution_index = new_solution_index; + continue 'check_last_stack; + } else { + // We're out of options here. So pop an entry, then in + // the next iteration of this backtracking loop we try + // to increment that solution + stack.pop(); + } + } + + // Stack length is 1, hence we're back at our initial solution. + // Since that doesn't yield a global solution, we simply: + return None; + } + + // Constructing the representation of the global solution + debug_assert_eq!(stack.len(), self.local.len()); + let mut final_branches = Vec::with_capacity(stack.len()); + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; + final_branches.push((component.component, solution.final_branch_id, component.sync_round)); + } + + // Just debugging here, TODO: @remove + let mut total_num_channels = 0; + for entry in &stack { + let component = &self.local[entry.component_index]; + total_num_channels += component.solutions[0].channel_mapping.len(); + } + + total_num_channels /= 2; + let mut final_mapping = Vec::with_capacity(total_num_channels); + let mut total_num_checked = 0; + + for entry in &stack { + let component = &self.local[entry.component_index]; + let solution = &component.solutions[entry.solution_index]; + + for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { + match final_mapping.iter().find(|(v, _)| *v == channel_id) { + Some((_, encountered_branch_id)) => { + debug_assert_eq!(*encountered_branch_id, branch_id); + total_num_checked += 1; + }, + None => { + final_mapping.push((channel_id, branch_id)); + } + } + } + } + + debug_assert_eq!(total_num_checked, total_num_channels); + + return Some(GlobalSolution{ + component_branches: final_branches, + channel_mapping: final_mapping, + }); + } + + /// Checks if all preconditions for global sync failure have been met + fn check_for_global_failure(&self) -> bool { + if !self.failure_reported { + return false; + } + + // Failure is reported, if all components are present then we may emit + // the global failure broadcast + // Check if all are present and we're preparing to fail this round + let mut all_present = true; + for presence in &self.presence { + if presence.state == PresenceState::OnePresent { + all_present = false; + break; + } + } + + return all_present; // && failure_reported, which is checked above + } + + /// Turns the entire (partially resolved) global solution into a structure + /// that can be forwarded to a new parent. The new parent may then merge + /// already obtained information. + fn drain(&mut self) -> Option { + if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported { + return None; + } + + let result = SolutionCombiner{ + local: self.local.clone(), + presence: self.presence.clone(), + failure_reported: self.failure_reported, + }; + + self.local.clear(); + self.presence.clear(); + self.failure_reported = false; + return Some(result); + } + + // TODO: Entire routine is quite wasteful. Combine instead of doing all work + // again. + fn combine(&mut self, combiner: SolutionCombiner) -> Option { + self.failure_reported = self.failure_reported || combiner.failure_reported; + + // Handle local solutions + if self.local.is_empty() { + // Trivial case + self.local = combiner.local; + } else { + for local in combiner.local { + for matched in local.solutions { + let local_solution = LocalSolution{ + component: local.component, + sync_round_number: local.sync_round, + final_branch_id: matched.final_branch_id, + port_mapping: matched.channel_mapping, + }; + let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution); + if let Some(global_solution) = maybe_solution { + return Some(LeaderConclusion::Solution(global_solution)); + } + } + } + } + + // Handle channel presence + println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence); + if self.presence.is_empty() { + // Trivial case + self.presence = combiner.presence; + println!("DEBUGERINO: Trivial merging") + } else { + for presence in combiner.presence { + match self.presence.iter_mut().find(|v| v.id == presence.id) { + Some(entry) => { + // Combine entries. Take first that has Closed, then + // check first that has both, then check if they are + // combinable + if entry.state == PresenceState::Closed { + // Do nothing + } else if presence.state == PresenceState::Closed { + entry.owner_a = presence.owner_a; + entry.owner_b = presence.owner_b; + entry.state = PresenceState::Closed; + } else if entry.state == PresenceState::BothPresent { + // Again: do nothing + } else if presence.state == PresenceState::BothPresent { + entry.owner_a = presence.owner_a; + entry.owner_b = presence.owner_b; + entry.state = PresenceState::BothPresent; + } else { + // Both have one presence, combine into both present + debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent); + entry.owner_b = Some(presence.owner_a); + entry.state = PresenceState::BothPresent; + } + }, + None => { + self.presence.push(presence); + } + } + } + println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence); + + // After adding everything we might have immediately found a solution + if self.check_for_global_failure() { + println!("DEBUG: Returning immediate failure?"); + return Some(LeaderConclusion::Failure); + } + } + + return None; + } + + fn clear(&mut self) { + self.local.clear(); + self.presence.clear(); + self.failure_reported = false; + } +} + +// ----------------------------------------------------------------------------- +// Generic Helpers +// ----------------------------------------------------------------------------- + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + use crate::protocol::eval::Value; + + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortIdLocal::new(port_id.id); + for prev_port in ports.iter() { + if *prev_port == cur_port { + // Already added + return; + } + } + + ports.push(cur_port); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } +} \ No newline at end of file