Changeset - f7da6a72fce7
[Not reviewed]
0 1 0
MH - 4 years ago 2021-09-21 22:27:51
contact@maxhenger.nl
move branch-related context into connector store
1 file changed with 29 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/runtime2/runtime.rs
Show inline comments
 
@@ -25,132 +25,151 @@ struct PortDesc {
 
struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
    /// Creates a new connector description. Implicit assumption is that there
 
    /// is one (non-sync) branch that can be immediately executed.
 
    fn new(id: u32, component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        let mut branches_active = VecDeque::new();
 
        branches_active.push_back(0);
 

	
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)],
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
enum BranchState {
 
    RunningNonSync, // regular running non-speculative branch
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
}
 

	
 
#[derive(Clone)]
 
struct BranchPortDesc {
 
    last_registered_index: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchContext {
 
    just_called_did_put: bool,
 
    pending_channel: Option<(Value, Value)>,
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
    branch_context: BranchContext,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self {
 
        // We expect that the given branche's context is not halfway handling a
 
        // `put(...)` or a `channel x -> y` statement.
 
        debug_assert!(!branch_state.branch_context.just_called_did_put);
 
        debug_assert!(branch_state.branch_context.pending_channel.is_none());
 

	
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
enum Scheduling {
 
    Immediate,
 
    Later,
 
    NotNow,
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
    PortMapping(u32, u32), // particular port's mapped branch number
 
}
 

	
 
// Local solution of the connector
 
#[derive(Clone)]
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    port_mapping: HashMap<u32, Option<u32>>, // port IDs of the connector, mapped to their branch IDs (None for silent ports)
 
}
 

	
 
#[derive(Clone)]
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_constraints: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
    ports: HashMap<u32, PortDesc>,
 
    port_counter: u32,
 
    connectors: HashMap<u32, ConnectorDesc>,
 
    connector_counter: u32,
 
    connectors_active: VecDeque<u32>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            protocol: pd,
 
            ports: HashMap::new(),
 
@@ -235,100 +254,97 @@ impl Runtime {
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            let mut scheduling = Scheduling::Immediate;
 

	
 
            while scheduling == Scheduling::Immediate {
 
                scheduling = self.run_connector(next_id);
 
            }
 

	
 
            match scheduling {
 
                Scheduling::Immediate => unreachable!(),
 
                Scheduling::Later => self.connectors_active.push_back(next_id),
 
                Scheduling::NotNow => {},
 
            }
 

	
 
            // Deal with any outgoing messages and potential solutions
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_new_solutions(next_id);
 
        }
 
    }
 

	
 
    /// Runs a connector for as long as sensible, then returns `true` if the
 
    /// connector should be run again in the future, and return `false` if the
 
    /// connector has terminated. Note that a terminated connector still 
 
    /// requires cleanup.
 
    pub fn run_connector(&mut self, connector_id: u32) -> Scheduling {
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 

	
 
        if desc.in_sync {
 
            return self.run_connector_sync_mode(connector_id);
 
        } else {
 
            return self.run_connector_regular_mode(connector_id);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve connector and branch that is supposed to be run
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(desc.in_sync);
 
        debug_assert!(!desc.spec_branches_active.is_empty());
 

	
 
        let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch = &mut desc.branches[branch_index as usize];
 

	
 
        // Run this particular branch to a next blocking point
 
        // TODO: PERSISTENT RUN CTX
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: Some(branch_index),
 
            just_called_did_put: false,
 
            pending_channel: None,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 

	
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent. So we don't
 
                // run it again
 
                branch.branch_state = BranchState::Failed;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that did not have a
 
                // value assigned yet. So branch and keep running.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert!(branch.port_mapping.get(&port_id).is_none());
 

	
 
                let mut copied_branch = Self::duplicate_branch(desc, branch_index);
 
                let copied_index = copied_branch.index;
 

	
 
                copied_branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 1,
 
                });
 

	
 
                let branch = &mut desc.branches[branch_index as usize]; // need to reborrow
 
                branch.port_mapping.insert(port_id, BranchPortDesc{
 
                    last_registered_index: None,
 
                    num_times_fired: 0,
 
                });
 

	
 
                // Run both again
 
                desc.branches.push(copied_branch);
 
                desc.spec_branches_active.push_back(branch_index);
 
                desc.spec_branches_active.push_back(copied_index);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch just performed a `get()` on a port that did
 
                // not yet receive a value.
 

	
 
                // First check if a port value is assigned to the
 
                // current branch. If so, check if it is consistent.
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                let mut insert_in_pending_receive = false;
 

	
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
@@ -370,226 +386,224 @@ impl Runtime {
 
                    }
 

	
 
                    // But also check immediately if we don't have a
 
                    // previously received message. If so, we
 
                    // immediately branch and accept the message
 
                    if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                        for message in messages {
 
                            let mut new_branch = Self::duplicate_branch(desc, branch_index);
 
                            let new_branch_idx = new_branch.index;
 
                            let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                            new_port_desc.last_registered_index = Some(message.peer_cur_branch_id);
 
                            new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                            desc.branches.push(new_branch);
 
                            desc.spec_branches_active.push_back(new_branch_idx);
 
                        }
 

	
 
                        if !messages.is_empty() {
 
                            return Scheduling::Immediate;
 
                        }
 
                    }
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Check the branch for any ports that were not used and
 
                // insert them in the port mapping as not having fired.
 
                for port_index in branch.owned_ports.iter().copied() {
 
                    let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                    if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                        entry.insert(BranchPortDesc {
 
                            last_registered_index: None,
 
                            num_times_fired: 0
 
                        });
 
                    }
 
                }
 

	
 
                // Mark the branch as being done
 
                branch.branch_state = BranchState::ReachedEndSync;
 
                desc.spec_branches_done.push(branch_index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                let mut can_put = true;
 
                branch.branch_context.just_called_did_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: Some(branch.index),
 
                            num_times_fired: 1,
 
                        });
 
                    },
 
                    Entry::Occupied(mut entry) => {
 
                        // Pre-existing entry
 
                        let entry = entry.get_mut();
 
                        if entry.num_times_fired == 0 {
 
                            // This is 'fine' in the sense that we have
 
                            // a normal inconsistency in the branch.
 
                            branch.branch_state = BranchState::Failed;
 
                            can_put = false;
 
                        } else if entry.last_registered_index.is_none() {
 
                            // A put() that follows a fires()
 
                            entry.last_registered_index = Some(branch.index);
 
                        } else {
 
                            // This should be fine in the future. But
 
                            // for now we throw an error as it doesn't
 
                            // mesh well with the 'fires()' concept.
 
                            todo!("throw an error of some sort, then fail all related")
 
                        }
 
                    }
 
                }
 

	
 
                if can_put {
 
                    // Actually put the message in the outbox
 
                    let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap();
 
                    let peer_id = port_desc.peer_id;
 
                    let peer_desc = self.ports.get(&peer_id).unwrap();
 
                    debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                    let peer_id = PortId(Id{
 
                        connector_id: peer_desc.owning_connector_id.unwrap(),
 
                        u32_suffix: peer_id
 
                    });
 

	
 
                    // For now this is the one and only time we're going
 
                    // to send a message. So for now we can't send a
 
                    // branch ID.
 
                    desc.global_outbox.insert_message(BufferedMessage{
 
                        sending_port: port_id,
 
                        receiving_port: peer_id,
 
                        peer_prev_branch_id: None,
 
                        peer_cur_branch_id: 0,
 
                        message: value_group,
 
                    });
 

	
 
                    // Finally, because we were able to put the message,
 
                    // we can run the branch again
 
                    desc.spec_branches_active.push_back(branch_index);
 
                    return Scheduling::Immediate;
 
                }
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
        }
 

	
 
        // Did not return that we need to immediately schedule again, so
 
        // determine if we want to do so based on the current number of active
 
        // speculative branches
 
        if desc.spec_branches_active.is_empty() {
 
            return Scheduling::NotNow;
 
        } else {
 
            return Scheduling::Later;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling {
 
        // Retrieve the connector and the branch (which is always the first one,
 
        // since we assume we're not running in sync-mode).
 
        // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 

	
 
        let branch = &mut desc.branches[0];
 

	
 
        // Run this branch to its blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: None,
 
            just_called_did_put: false,
 
            pending_channel: None,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => return Scheduling::NotNow,
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution
 
                Self::prepare_branch_for_sync(desc);
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Find all references to ports in the provided arguments, the
 
                // ownership of these ports will be transferred to the connector
 
                // we're about to create.
 
                let mut ports = Vec::with_capacity(arguments.values.len());
 
                find_ports_in_value_group(&arguments, &mut ports);
 

	
 
                // Generate a new connector with its own state
 
                let new_component_id = self.generate_connector_id();
 
                let new_component_state = ComponentState {
 
                    prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments)
 
                };
 

	
 
                for port_id in &ports {
 
                    let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap();
 
                    debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id);
 
                    port.owning_connector_id = Some(new_component_id)
 
                }
 

	
 
                // Finally push the new connector into the registry
 
                let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                self.connectors_active.push_back(new_component_id);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewChannel => {
 
                // Prepare channel
 
                debug_assert!(run_context.pending_channel.is_none());
 
                debug_assert!(run_context.branch_ctx.pending_channel.is_none());
 
                let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id));
 
                run_context.pending_channel = Some((
 
                run_context.branch_ctx.pending_channel = Some((
 
                    port_value_from_id(Some(connector_id), put_id, true),
 
                    port_value_from_id(Some(connector_id), get_id, false)
 
                ));
 

	
 
                return Scheduling::Immediate;
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        loop {
 
            let connector = self.connectors.get_mut(&connector_index).unwrap();
 
            let message_to_send = connector.global_outbox.take_next_message_to_send();
 

	
 
            if message_to_send.is_none() {
 
                return;
 
            }
 

	
 
            // We have a message to send
 
            let message_to_send = message_to_send.unwrap();
 

	
 
            // Lookup the target connector
 
            let target_port = message_to_send.receiving_port;
 
            let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 

	
 
            // Check if there are any branches that are waiting on
 
            // receives
 
            if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) {
 
                // Check each of the branches for a port mapping that
 
                // matches the one on the message header
 
                for branch_index in branch_indices {
 
                    let branch = &mut target_connector.branches[*branch_index as usize];
 
                    debug_assert_eq!(branch.branch_state, BranchState::BranchPoint);
 

	
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
@@ -983,128 +997,125 @@ impl Runtime {
 

	
 
        // Push first speculative branch as active branch
 
        let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]);
 
        desc.branches.push(new_branch);
 
        desc.spec_branches_active.push_back(new_branch_index);
 
        desc.in_sync = true;
 
    }
 

	
 
    /// Duplicates a particular (speculative) branch and returns it. Due to
 
    /// borrowing rules in code that uses this helper the returned branch still
 
    /// needs to be pushed onto the member `branches`.
 
    fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc {
 
        let original_branch = &desc.branches[original_branch_idx as usize];
 
        debug_assert!(desc.in_sync);
 

	
 
        let copied_index = desc.branches.len() as u32;
 
        let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch);
 

	
 
        return copied_branch;
 
    }
 

	
 
    /// Retrieves all parent IDs of a particular branch. These numbers run from
 
    /// the leaf towards the parent.
 
    fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec<u32>) {
 
        let mut next_branch_index = first_branch_index;
 
        result.clear();
 

	
 
        loop {
 
            result.push(next_branch_index);
 
            let branch = &desc.branches[next_branch_index as usize];
 

	
 
            match branch.parent_index {
 
                Some(index) => next_branch_index = index,
 
                None => return,
 
            }
 
        }
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Temporary references to branch related storage
 
    inbox: &'a HashMap<(PortId, u32), ValueGroup>,
 
    port_mapping: &'a HashMap<PortId, BranchPortDesc>,
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    just_called_did_put: bool,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
    branch_ctx: &'a mut BranchContext,
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        return self.just_called_did_put
 
        let old_value = self.branch_ctx.just_called_did_put;
 
        self.branch_ctx.just_called_did_put = false;
 
        return old_value;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let inbox_key = (port, 1);
 
        match self.inbox.get(&inbox_key) {
 
            None => None,
 
            Some(value) => Some(value.clone()),
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self.port_mapping.get(&port) {
 
            None => None,
 
            Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)),
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.pending_channel.take()
 
        self.branch_ctx.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports.iter() {
 
                    if prev_port == port_id {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 
                
 
                ports.push(*port_id);
 
            },
 
            Value::Array(heap_pos) | 
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 
    
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
fn port_value_from_id(connector_id: Option<u32>, port_id: u32, is_output: bool) -> Value {
 
    let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system
 
    if is_output {
 
        return Value::Output(PortId(Id{
 
            connector_id,
0 comments (0 inline, 0 general)