use std::sync::Arc; use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::hash_map::{Entry}; use crate::{Polarity, PortId}; use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; use super::registry::Registry; use super::messages::*; enum AddComponentError { ModuleDoesNotExist, ConnectorDoesNotExist, InvalidArgumentType(usize), // value is index of (first) invalid argument } struct PortDesc { id: u32, peer_id: u32, owning_connector_id: Option, is_getter: bool, // otherwise one can only call `put` } struct ConnectorDesc { id: u32, in_sync: bool, branches: Vec, // first one is always non-speculative one branch_id_counter: u32, spec_branches_active: VecDeque, // branches that can be run immediately spec_branches_pending_receive: HashMap>, // from port_id to branch index spec_branches_done: Vec, last_checked_done: u32, global_inbox: ConnectorInbox, global_outbox: ConnectorOutbox, } impl ConnectorDesc { /// Creates a new connector description. Implicit assumption is that there /// is one (non-sync) branch that can be immediately executed. fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { let mut branches_active = VecDeque::new(); branches_active.push_back(0); Self{ id, in_sync: false, branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], branch_id_counter: 1, spec_branches_active: branches_active, spec_branches_pending_receive: HashMap::new(), spec_branches_done: Vec::new(), last_checked_done: 0, global_inbox: ConnectorInbox::new(), global_outbox: ConnectorOutbox::new(), } } } enum BranchState { RunningNonSync, // regular running non-speculative branch RunningSync, // regular running speculative branch BranchPoint, // branch which ended up being a branching point ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution Failed, // branch that became inconsistent } struct BranchPortDesc { last_registered_identifier: Option, // if putter, then last sent branch ID, if getter, then last received branch ID num_times_fired: u32, // number of puts/gets on this port } struct BranchDesc { index: u32, parent_index: Option, identifier: u32, code_state: ComponentState, branch_state: BranchState, owned_ports: Vec, message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value port_mapping: HashMap, } impl BranchDesc { /// Creates the first non-sync branch of a connector fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { Self{ index: 0, parent_index: None, identifier: 0, code_state: component_state, branch_state: BranchState::RunningNonSync, owned_ports, message_inbox: HashMap::new(), port_mapping: HashMap::new(), } } /// Creates a sync branch based on the supplied branch. This supplied branch /// is the branching point for the new one, i.e. the parent in the branching /// tree. fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self { Self{ index, parent_index: Some(branch_state.index), identifier, code_state: branch_state.code_state.clone(), branch_state: BranchState::RunningSync, owned_ports: branch_state.owned_ports.clone(), message_inbox: branch_state.message_inbox.clone(), port_mapping: branch_state.port_mapping.clone(), } } } // Separate from Runtime for borrowing reasons struct Registry { ports: HashMap, port_counter: u32, connectors: HashMap, connector_counter: u32, } impl Registry { fn new() -> Self { Self{ ports: HashMap::new(), port_counter: 0, connectors: HashMap::new(), connector_counter: 0, } } /// Returns (putter_port, getter_port) pub fn add_channel(&mut self, owning_connector_id: Option) -> (u32, u32) { let get_id = self.generate_port_id(); let put_id = self.generate_port_id(); self.ports.insert(get_id, PortDesc{ id: get_id, peer_id: put_id, owning_connector_id, is_getter: true, }); self.ports.insert(put_id, PortDesc{ id: put_id, peer_id: get_id, owning_connector_id, is_getter: false, }); return (put_id, get_id); } fn generate_port_id(&mut self) -> u32 { let id = self.port_counter; self.port_counter += 1; return id; } } #[derive(Clone, Copy, Eq, PartialEq)] enum ProposedBranchConstraint { SilentPort(u32), // port id BranchNumber(u32), // branch id } // Local solution of the connector struct ProposedConnectorSolution { final_branch_id: u32, all_branch_ids: Vec, // the final branch ID and, recursively, all parents silent_ports: Vec, // port IDs of the connector itself } struct ProposedSolution { connector_mapping: HashMap, // from connector ID to branch ID connector_propositions: HashMap>, // from connector ID to encountered branch numbers remaining_connectors: Vec, // connectors that still need to be visited } // TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, registry: Registry, connectors_active: VecDeque, } impl Runtime { pub fn new(pd: Arc) -> Self { Self{ protocol: pd, registry: Registry::new(), connectors_active: VecDeque::new(), } } /// Creates a new channel that is not owned by any connector and returns its /// endpoints. The returned values are of the (putter port, getter port) /// respectively. pub fn add_channel(&mut self) -> (Value, Value) { let (put_id, get_id) = self.registry.add_channel(None); return ( port_value_from_id(None, put_id, true), port_value_from_id(None, get_id, false) ); } pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { use AddComponentError as ACE; use crate::runtime::error::AddComponentError as OldACE; // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component // TODO: Remove this error enum translation. Note that for now this // function forces port-only arguments let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { Ok(polarities) => polarities, Err(reason) => match reason { OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)), OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist), OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist), _ => unreachable!(), } }; // Make sure supplied values (and types) are correct let mut ports = Vec::with_capacity(values.values.len()); for (value_idx, value) in values.values.iter().enumerate() { let polarity = &port_polarities[value_idx]; match value { Value::Input(port_id) => { if *polarity != Polarity::Getter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(*port_id); }, Value::Output(port_id) => { if *polarity != Polarity::Putter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(*port_id); }, _ => return Err(ACE::InvalidArgumentType(value_idx)) } } // Instantiate the component let component_id = self.generate_connector_id(); let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); self.connectors_active.push_back(component_id); Ok(()) } pub fn run(&mut self) { // Go through all active connectors while !self.connectors_active.is_empty() { // Run a single connector let next_id = self.connectors_active.pop_front().unwrap(); let run_again = self.run_connector(next_id); if run_again { self.connectors_active.push_back(next_id); } self.empty_connector_outbox(next_id); self.check_connector_solution(next_id); } } /// Runs a connector for as long as sensible, then returns `true` if the /// connector should be run again in the future, and return `false` if the /// connector has terminated. Note that a terminated connector still /// requires cleanup. pub fn run_connector(&mut self, id: u32) -> bool { let desc = self.registry.connectors.get_mut(&id).unwrap(); let mut run_context = Context{ connector_id: id, branch_id: None, pending_channel: None, }; let mut call_again = false; // TODO: Come back to this, silly pattern while call_again { call_again = false; // bit of a silly pattern, maybe revise if desc.in_sync { // Running in synchronous mode, so run all branches until their // blocking point debug_assert!(!desc.spec_branches_active.is_empty()); let branch_index = desc.spec_branches_active.pop_front().unwrap(); let branch = &mut desc.branches[branch_index as usize]; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::BranchInconsistent => { // Speculative branch became inconsistent. So we don't // run it again branch.branch_state = BranchState::Failed; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that did not have a // value assigned yet. So branch and keep running debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert!(branch.port_mapping.get(&port_id).is_none()); let copied_index = Self::duplicate_branch(desc, branch_index); // Need to re-borrow to assign changed port state let original_branch = &mut desc.branches[branch_index as usize]; original_branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_identifier: None, num_times_fired: 0, }); let copied_branch = &mut desc.branches[copied_index as usize]; copied_branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_identifier: None, num_times_fired: 1, }); // Run both again desc.spec_branches_active.push_back(branch_index); desc.spec_branches_active.push_back(copied_index); }, RunResult::BranchMissingPortValue(port_id) => { // Branch just performed a `get()` on a port that did // not yet receive a value. // First check if a port value is assigned to the // current branch. If so, check if it is consistent. debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); let mut insert_in_pending_receive = false; match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet, so force to firing entry.insert(BranchPortDesc{ last_registered_identifier: None, num_times_fired: 1, }); branch.branch_state = BranchState::BranchPoint; insert_in_pending_receive = true; }, Entry::Occupied(entry) => { // Have an entry, check if it is consistent let entry = entry.get(); if entry.num_times_fired == 0 { // Inconsistent branch.branch_state = BranchState::Failed; } else { // Perfectly fine, add to queue debug_assert!(entry.last_registered_identifier.is_none()); assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); branch.branch_state = BranchState::BranchPoint; insert_in_pending_receive = true; } } } if insert_in_pending_receive { // Perform the insert match desc.spec_branches_pending_receive.entry(port_id) { Entry::Vacant(entry) => { entry.insert(vec![branch_index]); } Entry::Occupied(mut entry) => { let entry = entry.get_mut(); debug_assert!(!entry.contains(&branch_index)); entry.push(branch_index); } } // But also check immediately if we don't have a // previously received message. If so, we // immediately branch and accept the message if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { for message in messages { let new_branch_idx = Self::duplicate_branch(desc, branch_index); let new_branch = &mut desc.branches[new_branch_idx as usize]; let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id); new_branch.message_inbox.insert((port_id, 1), message.message.clone()); desc.spec_branches_active.push_back(new_branch_idx); } } } }, RunResult::BranchAtSyncEnd => { // Check the branch for any ports that were not used and // insert them in the port mapping as not having fired. for port_index in branch.owned_ports { let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index }); if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { entry.insert(BranchPortDesc { last_registered_identifier: None, num_times_fired: 0 }); } } // Mark the branch as being done branch.branch_state = BranchState::ReachedEndSync; desc.spec_branches_done.push(branch_index); }, RunResult::BranchPut(port_id, value_group) => { debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert_eq!(value_group.values.len(), 1); // can only send one value // Branch just performed a `put()`. Check if we have // assigned the port value and if so, if it is // consistent. let mut can_put = true; match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet entry.insert(BranchPortDesc{ last_registered_identifier: Some(branch.identifier), num_times_fired: 1, }); }, Entry::Occupied(mut entry) => { // Pre-existing entry let entry = entry.get_mut(); if entry.num_times_fired == 0 { // This is 'fine' in the sense that we have // a normal inconsistency in the branch. branch.branch_state = BranchState::Failed; can_put = false; } else if entry.last_registered_identifier.is_none() { // A put() that follows a fires() entry.last_registered_identifier = Some(branch.identifier); } else { // This should be fine in the future. But // for now we throw an error as it doesn't // mesh well with the 'fires()' concept. todo!("throw an error of some sort, then fail all related") } } } if can_put { // Actually put the message in the outbox let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); let peer_id = port_desc.peer_id; let peer_desc = self.registry.ports.get(&peer_id).unwrap(); debug_assert!(peer_desc.owning_connector_id.is_some()); let peer_id = PortId(Id{ connector_id: peer_desc.owning_connector_id.unwrap(), u32_suffix: peer_id }); // For now this is the one and only time we're going // to send a message. So for now we can't send a // branch ID. desc.global_outbox.insert((port_id, 1), BufferedMessage{ sending_port: port_id, receiving_port: peer_id, peer_prev_branch_id: None, peer_cur_branch_id: 0, message: value_group, }); // Finally, because we were able to put the message, // we can run the branch again desc.spec_branches_active.push_back(branch_index); call_again = true; } }, _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), } } else { // Running in non-synchronous mode let branch = &mut desc.branches[0]; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::ComponentTerminated => return false, RunResult::ComponentAtSyncStart => { // Prepare for sync execution Self::prepare_branch_for_sync(desc); call_again = true; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Generate a new connector with its own state let new_component_id = self.generate_connector_id(); let new_component_state = ComponentState { prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) }; // Transfer the ownership of any ports to the new connector let mut ports = Vec::with_capacity(arguments.values.len()); find_ports_in_value_group(&arguments, &mut ports); for port_id in &ports { let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap(); debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id); port.owning_connector_id = Some(new_component_id) } // Finally push the new connector into the registry let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); self.connectors_active.push_back(new_component_id); }, RunResult::NewChannel => { // Prepare channel debug_assert!(run_context.pending_channel.is_none()); let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id)); run_context.pending_channel = Some(( port_value_from_id(Some(run_context.connector_id), put_id, true), port_value_from_id(Some(run_context.connector_id), get_id, false) )); // Call again so it is retrieved from the context call_again = true; }, _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), } } } return true; } /// Puts all the messages that are currently in the outbox of a particular /// connector into the inbox of the receivers. If possible then branches /// will be created that receive those messages. fn empty_connector_outbox(&mut self, connector_index: u32) { let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() { // Lookup the target connector let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap(); debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); let target_connector_id = port_desc.owning_connector_id.unwrap(); let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap(); // In any case, always put the message in the global inbox target_connector.global_inbox.insert_message(message_to_send.clone()); // Check if there are any branches that are waiting on // receives if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) { // Check each of the branches for a port mapping that // matches the one on the message header for branch_index in branch_indices { let branch = &mut target_connector.branches[*branch_index as usize]; debug_assert_eq!(branch.branch_state, BranchState::BranchPoint); let mut can_branch = false; if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { can_branch = true; } } if can_branch { // Put the message inside a clone of the currently // waiting branch let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index); let new_branch = &mut target_connector.branches[new_branch_idx as usize]; let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id); new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); // And queue the branch for further execution target_connector.spec_branches_active.push(new_branch_idx); if !self.connectors_active.contains(&target_connector.id) { self.connectors_active.push_back(target_connector.id); } } } } } } /// Checks a connector for the submitted solutions. After all neighbouring /// connectors have been checked all of their "last checked solution" index /// will be incremented. fn check_connector_new_solutions(&mut self, connector_index: u32) { // Take connector and start processing its solutions let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); let mut considered_connectors = HashSet::new(); let mut valid_solutions = Vec::new(); while connector.last_checked_done != connector.spec_branches_done.len() as u32 { // We have a new solution to consider let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; connector.last_checked_done += 1; let branch = &connector.branches[start_branch_index as usize]; debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync); // Clear storage for potential solutions considered_connectors.clear(); // Start seeking solution among other connectors within the same // synchronous region considered_connectors.insert(connector.id); for port in branch.port_ } } fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) { // Take the connector and branch of interest let first_connector = self.registry.connectors.get(&first_connector_index).unwrap(); let first_branch = &first_connector.branches[first_branch_index as usize]; debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); // Setup the first solution let mut first_solution = ProposedSolution{ connector_mapping: HashMap::new(), connector_propositions: HashMap::new(), remaining_connectors: Vec::new(), }; first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier); for (port_id, port_mapping) in first_branch.port_mapping.iter() { let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); let peer_port_id = port_desc.peer_id; let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap(); let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); let constraint = match port_mapping.last_registered_identifier { Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), None => ProposedBranchConstraint::SilentPort(peer_port_id), }; match first_solution.connector_propositions.entry(peer_connector_id) { Entry::Vacant(entry) => { // Not yet encountered entry.insert(vec![constraint]); first_solution.remaining_connectors.push(peer_connector_id); }, Entry::Occupied(mut entry) => { // Already encountered let entry = entry.get_mut(); if !entry.contains(&constraint) { entry.push(constraint); } } } } // Setup storage for all possible solutions let mut all_solutions = Vec::new(); all_solutions.push(first_solution); while !all_solutions.is_empty() { let mut cur_solution = all_solutions.pop().unwrap(); } } fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap(); let cur_connector = self.registry.connectors.get(&target_connector).unwrap(); // Make sure all propositions are unique for i in 0..branch_propositions.len() { let proposition_i = branch_propositions[i]; for j in 0..i { let proposition_j = branch_propositions[j]; debug_assert_ne!(proposition_i, proposition_j); } } // Check connector for compatible branches let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len()); let mut encountered_propositions = Vec::new(); 'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done { // Reset the propositions matching variables encountered_propositions.clear(); encountered_propositions.resize(branch_propositions.len(), false); // First check the silent port propositions let cur_branch = &cur_connector.branches[branch_idx as usize]; for (proposition_idx, proposition) in branch_propositions.iter().enumerate() { match proposition { ProposedBranchConstraint::SilentPort(port_id) => { let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id }); let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap(); if port_mapping.num_times_fired != 0 { // Port did fire, so the current branch is not // compatible continue 'finished_branch_loop; } // Otherwise, the port was silent indeed encountered_propositions[proposition_idx] = true; }, ProposedBranchConstraint::BranchNumber(_) => {}, } } // Then check the branch number propositions let mut parent_branch_idx = branch_idx; loop { let branch = &cur_connector.branches[parent_branch_idx as usize]; for proposition_idx in 0..branch_propositions.len() { let proposition = branch_propositions[proposition_idx]; match proposition { ProposedBranchConstraint::SilentPort(_) => {}, ProposedBranchConstraint::BranchNumber(branch_number) => { if branch_number == branch.identifier { encountered_propositions[proposition_idx] = true; } } } } if branch.parent_index.is_none() { // No more parents break; } parent_branch_idx = branch.parent_index.unwrap(); } if !encountered_propositions.iter().all(|v| *v) { // Not all of the constraints were matched continue 'finished_branch_loop } // All of the constraints on the branch did indeed match. } } fn generate_connector_id(&mut self) -> u32 { let id = self.registry.connector_counter; self.registry.connector_counter += 1; return id; } // ------------------------------------------------------------------------- // Helpers for branch management // ------------------------------------------------------------------------- /// Prepares a speculative branch for further execution from the connector's /// non-speculative base branch. fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { // Ensure only one branch is active, the non-sync branch debug_assert!(!desc.in_sync); debug_assert_eq!(desc.branches.len(), 1); debug_assert!(desc.spec_branches_active.is_empty()); let new_branch_index = 1; let new_branch_identifier = desc.branch_id_counter; desc.branch_id_counter += 1; // Push first speculative branch as active branch let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]); desc.branches.push(new_branch); desc.spec_branches_active.push_back(new_id); desc.in_sync = true; } /// Duplicates a particular (speculative) branch and returns its index. fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 { let original_branch = &desc.branches[original_branch_idx as usize]; debug_assert!(desc.in_sync); let copied_index = desc.branches.len() as u32; let copied_id = desc.branch_id_counter; desc.branch_id_counter += 1; let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch); desc.branches.push(copied_branch); return copied_index; } } /// Context accessible by the code while being executed by the runtime. When the /// code is being executed by the runtime it sometimes needs to interact with /// the runtime. This is achieved by the "code throwing an error code", after /// which the runtime modifies the appropriate variables and continues executing /// the code again. struct Context<'a> { // Properties of currently running connector/branch connector_id: u32, branch_id: Option, // Resources ready to be retrieved by running code pending_channel: Option<(Value, Value)>, // (put, get) ports } impl<'a> crate::protocol::RunContext for Context<'a> { fn did_put(&self, port: PortId) -> bool { todo!() } fn get(&self, port: PortId) -> Option { todo!() } fn fires(&self, port: PortId) -> Option { todo!() } fn get_channel(&mut self) -> Option<(Value, Value)> { self.pending_channel.take() } } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port for prev_port in ports { if prev_port == port_id { // Already added return; } } ports.push(*port_id); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system if is_output { return Value::Output(PortId(Id{ connector_id, u32_suffix: port_id })); } else { return Value::Input(PortId(Id{ connector_id, u32_suffix: port_id, })); } }