use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage}; use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not /// yet receive anything from any branch). #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) struct BranchId { pub index: u32, } impl BranchId { fn new_invalid() -> Self { Self{ index: 0 } } fn new(index: u32) -> Self { debug_assert!(index != 0); Self{ index } } #[inline] fn is_valid(&self) -> bool { return self.index != 0; } } #[derive(PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code Error, // encountered a runtime error Finished, // finished executing connector's code // Synchronous variants RunningInSync, // running within a sync block HaltedAtBranchPoint, // at a branching point (at a `get` call) ReachedSyncEnd, // reached end of sync block, branch represents a local solution Inconsistent, // branch can never represent a local solution, so halted } pub(crate) struct Branch { index: BranchId, parent_index: BranchId, // Code execution state code_state: ComponentState, sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } impl Branch { /// Constructs a non-sync branch. It is assumed that the code is at the /// first instruction pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { Branch{ index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), code_state: component_state, sync_state: SpeculativeState::RunningNonSync, next_branch_in_queue: None, received: HashMap::new(), ports_delta: Vec::new(), } } /// Constructs a sync branch. The provided branch is assumed to be the /// parent of the new branch within the execution tree. fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self { debug_assert!( (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); Branch{ index: BranchId::new(new_index), parent_index: parent_branch.index, code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, next_branch_in_queue: None, received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), } } } #[derive(Clone)] struct PortAssignment { is_assigned: bool, last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet num_times_fired: u32, } impl PortAssignment { fn new_unassigned() -> Self { Self{ is_assigned: false, last_registered_branch_id: BranchId::new_invalid(), num_times_fired: 0, } } #[inline] fn mark_speculative(&mut self, num_times_fired: u32) { debug_assert!(!self.last_registered_branch_id.is_valid()); self.is_assigned = true; self.num_times_fired = num_times_fired; } #[inline] fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) { self.is_assigned = true; self.last_registered_branch_id = branch_id; self.num_times_fired = num_times_fired; } } #[derive(Clone, Eq)] struct PortOwnershipDelta { acquired: bool, // if false, then released ownership port_id: PortIdLocal, } enum PortOwnershipError { UsedInInteraction(PortIdLocal), AlreadyGivenAway(PortIdLocal) } /// As the name implies, this contains a description of the ports associated /// with a connector. /// TODO: Extend documentation pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. pub owned_ports: Vec, // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. pub port_mapping: Vec } impl ConnectorPorts { /// Constructs the initial ports object. Assumes the presence of the /// non-sync branch at index 0. Will initialize all entries for the non-sync /// branch. fn new(owned_ports: Vec) -> Self { let num_ports = owned_ports.len(); let mut port_mapping = Vec::with_capacity(num_ports); for _ in 0..num_ports { port_mapping.push(PortAssignment::new_unassigned()); } Self{ owned_ports, port_mapping } } /// Prepares the port mapping for a new branch. Assumes that there is no /// intermediate branch index that we have skipped. fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) { let num_ports = self.owned_ports.len(); let parent_base_idx = parent_branch_idx as usize * num_ports; let new_base_idx = new_branch_idx as usize * num_ports; debug_assert!(parent_branch_idx < new_branch_idx); debug_assert!(new_base_idx == self.port_mapping.len()); self.port_mapping.reserve(num_ports); for offset in 0..num_ports { let parent_port = &self.port_mapping[parent_base_idx + offset]; self.port_mapping.push(parent_port.clone()); } } /// Removes a particular port from the connector. May only be done if the /// connector is in non-sync mode fn remove_port(&mut self, port_id: PortIdLocal) { debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode let port_index = self.get_port_index(port_id).unwrap(); self.owned_ports.remove(port_index); self.port_mapping.remove(port_index); } /// Retrieves the index associated with a port id. Note that the port might /// not exist (yet) if a speculative branch has just received the port. /// TODO: But then again, one cannot use that port, right? #[inline] fn get_port_index(&self, port_id: PortIdLocal) -> Option { for (idx, port) in self.owned_ports.iter().enumerate() { if port == port_id { return Some(idx) } } return None } #[inline] fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); return &self.port_mapping[mapped_idx]; } #[inline] fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); return &mut self.port_mapping(mapped_idx); } fn num_ports(&self) -> usize { return self.owned_ports.len(); } // Function for internal use: retrieve index in flattened port mapping array // based on branch/port index. #[inline] fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize { let branch_idx = branch_idx as usize; let num_ports = self.owned_ports.len(); debug_assert!(port_idx < num_ports); debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); return branch_idx * num_ports + port_idx; } } struct BranchQueue { first: u32, last: u32, } impl BranchQueue { fn new() -> Self { Self{ first: 0, last: 0 } } fn is_empty(&self) -> bool { debug_assert!((self.first == 0) == (self.last == 0)); return self.first == 0; } } /// Public fields of the connector that can be freely shared between multiple /// threads. pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new() -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(false), } } } // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself id: u32, in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, sync_finished_last_handled: u32, // Port/message management pub inbox: PrivateInbox, pub ports: ConnectorPorts, } struct TempCtx {} impl RunContext for TempCtx { fn did_put(&mut self, port: PortId) -> bool { todo!() } fn get(&mut self, port: PortId) -> Option { todo!() } fn fires(&mut self, port: PortId) -> Option { todo!() } fn get_channel(&mut self) -> Option<(Value, Value)> { todo!() } } impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, /// hence is in a non-sync state. pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec) -> Self { Self{ id, in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), sync_finished_last_handled: 0, // none at all inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) { } pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { let scheduling = self.run_in_speculative_mode(pd, results); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. if self.sync_finished_last_handled != self.sync_finished.last { let mut next_id; if self.sync_finished_last_handled == 0 { next_id = self.sync_finished.first; } else { let last_handled = &self.branches[self.sync_finished_last_handled as usize]; debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" next_id = last_handled.next_branch_in_queue.unwrap(); } // Transform branch into proposed global solution } } else { let scheduling = self.run_in_deterministic_mode(pd, results); return scheduling; } } /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point let mut run_context = TempCtx{}; let run_result = branch.code_state.run(&mut run_context, pd); // Match statement contains `return` statements only if the particular // run result behind handled requires an immediate re-run of the // connector. match run_result { RunResult::BranchInconsistent => { // Speculative branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that does not yet have an // assigned speculative value. So we need to create those // branches let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); debug_assert!(self.ports.owned_ports.contains(&local_port_id)); let silent_branch = &*branch; // Create a copied branch who will have the port set to firing let firing_index = self.branches.len() as u32; let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch); self.ports.prepare_sync_branch(branch.index.index, firing_index); let firing_port = self.ports.get_port_mut(firing_index, local_port_index); firing_port.mark_speculative(1); // Assign the old branch a silent value let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index); silent_port.mark_speculative(0); // Run both branches again Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index); self.branches.push(firing_branch); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch performed a `get` on a port that has not yet received // a value in its inbox. let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("deal with the case where the port is acquired"); } let local_port_index = local_port_index.unwrap(); let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); // Check for port mapping assignment and, if present, if it is // consistent let is_valid_get = if port_mapping.is_assigned { assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires` port_mapping.num_times_fired == 1 } else { // Not yet assigned port_mapping.mark_speculative(1); true }; if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index); // But if some messages can be immediately applied, do so // now. let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { did_have_messages = true; // For each message prepare a new branch to execute let new_branch_index = self.branches.len() as u32; let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); self.ports.prepare_sync_branch(branch.index.index, new_branch_index); let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); port_mapping.last_registered_branch_id = message.sender_cur_branch_id; debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); new_branch.received.insert(local_port_id, message.clone()); // If the message contains any ports then they will now // be owned by the new branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); results.ports.clear(); // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); self.branches.push(new_branch); } if did_have_messages { // If we did create any new branches, then we can run // them immediately. return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } }, RunResult::BranchAtSyncEnd => { // Branch is done, go through all of the ports that are not yet // assigned and map them to non-firing. for port_idx in 0..self.ports.num_ports() { let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); if !port_mapping.is_assigned { port_mapping.mark_speculative(0); } } branch.sync_state = SpeculativeState::ReachedSyncEnd; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index); }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("handle case where port was received before (i.e. in ports_delta)") } let local_port_index = local_port_index.unwrap(); // Check the port mapping for consistency // TODO: For now we can only put once, so that simplifies stuff let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); let is_valid_put = if port_mapping.is_assigned { // Already assigned, so must be speculative and one time // firing, otherwise we are `put`ing multiple times. if port_mapping.last_registered_branch_id.is_valid() { // Already did a `put` todo!("handle error through RunDeltaState"); } else { // Valid if speculatively firing port_mapping.num_times_fired == 1 } } else { // Not yet assigned, do so now true }; if is_valid_put { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); let message = OutgoingMessage { sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, message: value_group, }; // If the message contains any ports then we release our // ownership over them in this branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); results.ports.clear(); results.outbox.push(message); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; } }, _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result), } // Not immediately scheduling, so schedule again if there are more // branches to run if self.sync_active.is_empty() { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } /// Runs the connector in non-synchronous mode. pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); let branch = &mut self.branches[0]; debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = TempCtx{}; let run_result = branch.code_state.run(&mut run_context, pd); match run_result { RunResult::ComponentTerminated => { // Need to wait until all children are terminated // TODO: Think about how to do this? branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::NotNow; }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution and reschedule immediately self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index); self.branches.push(first_sync_branch); return ConnectorScheduling::Later; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Construction of a new component. Find all references to ports // inside of the arguments debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&arguments, &mut results.ports); if !results.ports.is_empty() { // Ports changing ownership if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) { todo!("fatal error handling"); } } // Add connector for later execution let new_connector_state = ComponentState { prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments) }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); return ConnectorScheduling::Later; }, RunResult::NewChannel => { // Need to prepare a new channel todo!("adding channels to some global context"); return ConnectorScheduling::Later; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } // ------------------------------------------------------------------------- // Internal helpers // ------------------------------------------------------------------------- // Helpers for management of the branches and their internally stored // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming // linked lists inside of the vector of branches. #[inline] fn pop_branch(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; *queue.first = branch.next_branch_in_queue.unwrap_or(0); branch.next_branch_in_queue = None; if *queue.first == 0 { // No more entries in queue debug_assert_eq!(*queue.last, branch.index.index); *queue.last = 0; } return branch; } #[inline] fn push_branch_into_queue( branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, ) { debug_assert!(to_push.is_valid()); let to_push = to_push.index; if *queue.last == 0 { // No branches in the queue at all debug_assert_eq!(*queue.first, 0); branches[to_push as usize].next_branch_in_queue = None; *queue.first = to_push; *queue.last = to_push; } else { // Pre-existing branch in the queue debug_assert_ne!(*queue.first, 0); branches[*queue.last as usize].next_branch_in_queue = Some(to_push); *queue.last = to_push; } } // Helpers for local port management. Specifically for adopting/losing // ownership over ports /// Releasing ownership of ports while in non-sync mode. This only occurs /// while instantiating new connectors fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { debug_assert!(!branch.index.is_valid()); // branch in non-sync mode for port_id in port_ids { // We must own the port, or something is wrong with our code todo!("Set up some kind of message router"); debug_assert!(ports.get_port_index(*port_id).is_some()); ports.remove_port(*port_id); } return Ok(()) } /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { debug_assert!(branch.index.is_valid()); // branch in sync mode for port_id in port_ids { match ports.get_port_index(*port_id) { Some(port_index) => { // We (used to) own the port. Make sure it is not given away // already and not used to put/get data. let port_mapping = ports.get_port(branch.index.index, port_index); if port_mapping.is_assigned && port_mapping.num_times_fired != 0 { // Already used return Err(PortOwnershipError::UsedInInteraction(*port_id)); } for delta in &branch.ports_delta { if delta.port_id == port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. debug_assert!(!delta.acquired); return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); } } branch.ports_delta.push(PortOwnershipDelta{ acquired: false, port_id: *port_id, }); }, None => { // Not in port mapping, so we must have acquired it before, // remove the acquirement. let mut to_delete_index: isize = -1; for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { if delta.port_id == *port_id { debug_assert!(delta.acquired); to_delete_index = delta_idx as isize; break; } } debug_assert!(to_delete_index != -1); branch.ports_delta.remove(to_delete_index as usize); } } } return Ok(()) } /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { debug_assert!(branch.index.is_valid()); // branch in sync mode 'port_loop: for port_id in port_ids { for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { if delta.port_id == *port_id { if delta.acquired { // Somehow already received this port. // TODO: @security todo!("take care of nefarious peers"); } else { // Sending ports to ourselves debug_assert!(ports.get_port_index(*port_id).is_some()); branch.ports_delta.remove(delta_idx); continue 'port_loop; } } } // If here then we can safely acquire the new port branch.ports_delta.push(PortOwnershipDelta{ acquired: true, port_id: *port_id, }); } return Ok(()) } // Helpers for generating and handling sync messages (and the solutions that // are described by those sync messages) fn generate_initial_solution_for_branch(&self, branch_id: BranchId,) } /// A data structure passed to a connector whose code is being executed that is /// used to queue up various state changes that have to be applied after /// running, e.g. the messages the have to be transferred to other connectors. // TODO: Come up with a better name pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, } impl RunDeltaState { /// Constructs a new `RunDeltaState` object with the default amount of /// reserved memory pub fn new() -> Self { RunDeltaState{ outbox: Vec::with_capacity(64), new_connectors: Vec::new(), ports: Vec::with_capacity(64), } } } pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortIdLocal::new(port_id.0.u32_suffix); for prev_port in ports.iter() { if prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } }