Changeset - 0d5a89aea247
[Not reviewed]
0 3 1
MH - 4 years ago 2021-09-13 12:22:34
contact@maxhenger.nl
halfway shared-memory new consensus algorithm
4 files changed with 498 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/value.rs
Show inline comments
 
@@ -160,6 +160,7 @@ impl Value {
 
///
 
/// Again: this is a temporary thing, hopefully removed once we move to a
 
/// bytecode interpreter.
 
#[derive(Clone)]
 
pub struct ValueGroup {
 
    pub(crate) values: Vec<Value>,
 
    pub(crate) regions: Vec<Vec<Value>>
src/runtime2/messages.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 

	
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
/// A message residing in a connector's inbox (waiting to be put into some kind
 
/// of speculative branch), or a message waiting to be sent.
 
#[derive(Clone)]
 
pub struct BufferedMessage {
 
    pub(crate) sending_port: PortId,
 
    pub(crate) receiving_port: PortId,
 
    pub(crate) peer_prev_branch_id: Option<u32>,
 
    pub(crate) peer_cur_branch_id: u32,
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
/// because a message might be received before a branch arrives at the
 
/// corresponding `get()` that is supposed to receive that message. Hence we
 
/// need to store it for all future branches that might be able to receive it.
 
pub struct ConnectorInbox {
 
    // TODO: @optimize, HashMap + Vec is a bit stupid.
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
 
            messages: HashMap::new(),
 
        }
 
    }
 

	
 
    /// Inserts a new message into the inbox.
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Messages are received from actors we generally cannot
 
        //  trust, and may be unreliable, so messages may be received multiple
 
        //  times or have spoofed branch IDs. Debug asserts are present for the
 
        //  initial implementation.
 

	
 
        // If it is the first message on the port, then we cannot possible have
 
        // a previous port mapping on that port.
 
        let port_action = PortAction{
 
            port_id: message.sending_port.0.u32_suffix,
 
            prev_branch_id: message.peer_prev_branch_id,
 
        };
 

	
 
        match self.messages.entry(port_action) {
 
            Entry::Occupied(mut entry) => {
 
                let entry = entry.get_mut();
 
                debug_assert!(
 
                    entry.iter()
 
                        .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id)
 
                        .is_none(),
 
                    "inbox already contains sent message (same new branch ID)"
 
                );
 

	
 
                entry.push(message);
 
            },
 
            Entry::Vacant(entry) => {
 
                entry.insert(vec![message]);
 
            }
 
        }
 
    }
 

	
 
    /// Checks if the provided port (and the branch id mapped to that port)
 
    /// correspond to any messages in the inbox.
 
    pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option<u32>) -> Option<&[BufferedMessage]> {
 
        let port_action = PortAction{
 
            port_id,
 
            prev_branch_id: prev_branch_id_at_port,
 
        };
 

	
 
        match self.messages.get(&port_action) {
 
            Some(messages) => return Some(messages.as_slice()),
 
            None => return None,
 
        }
 
    }
 
}
 

	
 
/// A connector's outbox. A temporary storage for messages that are sent by
 
/// branches performing `put`s until we're done running all branches and can
 
/// actually transmit the messages.
 
pub struct ConnectorOutbox {
 
    messages: Vec<BufferedMessage>,
 
    sent_counter: usize,
 
}
 

	
 
impl ConnectorOutbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Vec::new(),
 
            sent_counter: 0,
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, message: BufferedMessage) {
 
        // TODO: @error - Depending on the way we implement the runtime in the
 
        //  future we might end up not trusting "our own code" (i.e. in case
 
        //  the connectors we are running are described by foreign code)
 
        debug_assert!(
 
            self.messages.iter()
 
                .find(|v|
 
                    v.sending_port == message.sending_port &&
 
                    v.peer_prev_branch_id == message.peer_prev_branch_id
 
                )
 
                .is_none(),
 
            "messages was already registered for sending"
 
        );
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> {
 
        if self.sent_counter == self.messages.len() {
 
            return None;
 
        }
 

	
 
        let cur_index = self.sent_counter;
 
        self.sent_counter += 1;
 
        return Some(&self.messages[cur_index]);
 
    }
 

	
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
        self.sent_counter = 0;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
\ No newline at end of file
 
mod runtime;
 
mod messages;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::Arc;
 
use std::collections::{HashMap, VecDeque};
 
use std::collections::{HashMap, HashSet, VecDeque};
 
use std::collections::hash_map::{Entry};
 

	
 
use crate::{Polarity, PortId};
 
@@ -8,6 +8,7 @@ use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 
use super::messages::*;
 

	
 
enum AddComponentError {
 
    ModuleDoesNotExist,
 
@@ -22,26 +23,17 @@ struct PortDesc {
 
    is_getter: bool, // otherwise one can only call `put`
 
}
 

	
 
// Message received from some kind of peer
 
struct BufferedMessage {
 
    // If in inbox, then sender is the connector's peer. If in the outbox, then
 
    // the sender is the connector itself.
 
    sending_port: PortId,
 
    receiving_port: PortId,
 
    peer_prev_branch_id: Option<u32>, // of the sender
 
    peer_cur_branch_id: u32, // of the sender
 
    message: ValueGroup,
 
}
 

	
 
struct ConnectorDesc {
 
    id: u32,
 
    in_sync: bool,
 
    branches: Vec<BranchDesc>, // first one is always non-speculative one
 
    branch_id_counter: u32,
 
    spec_branches_active: VecDeque<u32>, // branches that can be run immediately
 
    spec_branches_pending_receive: HashMap<PortId, u32>, // from port_id to branch index
 
    global_inbox: HashMap<(PortId, u32), BufferedMessage>,
 
    global_outbox: HashMap<(PortId, u32), BufferedMessage>,
 
    spec_branches_pending_receive: HashMap<PortId, Vec<u32>>, // from port_id to branch index
 
    spec_branches_done: Vec<u32>,
 
    last_checked_done: u32,
 
    global_inbox: ConnectorInbox,
 
    global_outbox: ConnectorOutbox,
 
}
 

	
 
impl ConnectorDesc {
 
@@ -58,8 +50,10 @@ impl ConnectorDesc {
 
            branch_id_counter: 1,
 
            spec_branches_active: branches_active,
 
            spec_branches_pending_receive: HashMap::new(),
 
            global_inbox: HashMap::new(),
 
            global_outbox: HashMap::new(),
 
            spec_branches_done: Vec::new(),
 
            last_checked_done: 0,
 
            global_inbox: ConnectorInbox::new(),
 
            global_outbox: ConnectorOutbox::new(),
 
        }
 
    }
 
}
 
@@ -166,6 +160,25 @@ impl Registry {
 
    }
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq)]
 
enum ProposedBranchConstraint {
 
    SilentPort(u32), // port id
 
    BranchNumber(u32), // branch id
 
}
 

	
 
// Local solution of the connector
 
struct ProposedConnectorSolution {
 
    final_branch_id: u32,
 
    all_branch_ids: Vec<u32>, // the final branch ID and, recursively, all parents
 
    silent_ports: Vec<u32>, // port IDs of the connector itself
 
}
 

	
 
struct ProposedSolution {
 
    connector_mapping: HashMap<u32, ProposedConnectorSolution>, // from connector ID to branch ID
 
    connector_propositions: HashMap<u32, Vec<ProposedBranchConstraint>>, // from connector ID to encountered branch numbers
 
    remaining_connectors: Vec<u32>, // connectors that still need to be visited
 
}
 

	
 
// TODO: @performance, use freelists+ids instead of HashMaps
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 
@@ -197,7 +210,6 @@ impl Runtime {
 
        use AddComponentError as ACE;
 
        use crate::runtime::error::AddComponentError as OldACE;
 

	
 
        // TODO: Allow the ValueGroup to contain any kind of value
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
@@ -252,8 +264,16 @@ impl Runtime {
 
    pub fn run(&mut self) {
 
        // Go through all active connectors
 
        while !self.connectors_active.is_empty() {
 
            // Run a single connector
 
            let next_id = self.connectors_active.pop_front().unwrap();
 
            self.run_connector(next_id);
 
            let run_again = self.run_connector(next_id);
 

	
 
            if run_again {
 
                self.connectors_active.push_back(next_id);
 
            }
 

	
 
            self.empty_connector_outbox(next_id);
 
            self.check_connector_solution(next_id);
 
        }
 
    }
 

	
 
@@ -269,7 +289,7 @@ impl Runtime {
 
            pending_channel: None,
 
        };
 

	
 
        let mut call_again = false;
 
        let mut call_again = false; // TODO: Come back to this, silly pattern
 

	
 
        while call_again {
 
            call_again = false; // bit of a silly pattern, maybe revise
 
@@ -321,6 +341,8 @@ impl Runtime {
 
                        // First check if a port value is assigned to the
 
                        // current branch. If so, check if it is consistent.
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        let mut insert_in_pending_receive = false;
 

	
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet, so force to firing
 
@@ -329,7 +351,7 @@ impl Runtime {
 
                                    num_times_fired: 1,
 
                                });
 
                                branch.branch_state = BranchState::BranchPoint;
 
                                desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                insert_in_pending_receive = true;
 
                            },
 
                            Entry::Occupied(entry) => {
 
                                // Have an entry, check if it is consistent
 
@@ -339,19 +361,124 @@ impl Runtime {
 
                                    branch.branch_state = BranchState::Failed;
 
                                } else {
 
                                    // Perfectly fine, add to queue
 
                                    debug_assert!(entry.last_registered_identifier.is_none());
 
                                    assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now");
 
                                    branch.branch_state = BranchState::BranchPoint;
 
                                    desc.spec_branches_pending_receive.insert(port_id, branch_index);
 
                                    insert_in_pending_receive = true;
 
                                }
 
                            }
 
                        }
 

	
 
                        if insert_in_pending_receive {
 
                            // Perform the insert
 
                            match desc.spec_branches_pending_receive.entry(port_id) {
 
                                Entry::Vacant(entry) => {
 
                                    entry.insert(vec![branch_index]);
 
                                }
 
                                Entry::Occupied(mut entry) => {
 
                                    let entry = entry.get_mut();
 
                                    debug_assert!(!entry.contains(&branch_index));
 
                                    entry.push(branch_index);
 
                                }
 
                            }
 

	
 
                            // But also check immediately if we don't have a
 
                            // previously received message. If so, we
 
                            // immediately branch and accept the message
 
                            if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) {
 
                                for message in messages {
 
                                    let new_branch_idx = Self::duplicate_branch(desc, branch_index);
 
                                    let new_branch = &mut desc.branches[new_branch_idx as usize];
 
                                    let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap();
 
                                    new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id);
 
                                    new_branch.message_inbox.insert((port_id, 1), message.message.clone());
 

	
 
                                    desc.spec_branches_active.push_back(new_branch_idx);
 
                                }
 
                            }
 
                        }
 
                    },
 
                    RunResult::BranchAtSyncEnd => {
 
                        // Check the branch for any ports that were not used and
 
                        // insert them in the port mapping as not having fired.
 
                        for port_index in branch.owned_ports {
 
                            let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index });
 
                            if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) {
 
                                entry.insert(BranchPortDesc {
 
                                    last_registered_identifier: None,
 
                                    num_times_fired: 0
 
                                });
 
                            }
 
                        }
 

	
 
                        // Mark the branch as being done
 
                        branch.branch_state = BranchState::ReachedEndSync;
 
                        todo!("somehow propose solution");
 
                        desc.spec_branches_done.push(branch_index);
 
                    },
 
                    RunResult::BranchPut(port_id, value_group) => {
 
                        debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                        debug_assert_eq!(value_group.values.len(), 1)
 
                        debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                        // Branch just performed a `put()`. Check if we have
 
                        // assigned the port value and if so, if it is
 
                        // consistent.
 
                        let mut can_put = true;
 
                        match branch.port_mapping.entry(port_id) {
 
                            Entry::Vacant(entry) => {
 
                                // No entry yet
 
                                entry.insert(BranchPortDesc{
 
                                    last_registered_identifier: Some(branch.identifier),
 
                                    num_times_fired: 1,
 
                                });
 
                            },
 
                            Entry::Occupied(mut entry) => {
 
                                // Pre-existing entry
 
                                let entry = entry.get_mut();
 
                                if entry.num_times_fired == 0 {
 
                                    // This is 'fine' in the sense that we have
 
                                    // a normal inconsistency in the branch.
 
                                    branch.branch_state = BranchState::Failed;
 
                                    can_put = false;
 
                                } else if entry.last_registered_identifier.is_none() {
 
                                    // A put() that follows a fires()
 
                                    entry.last_registered_identifier = Some(branch.identifier);
 
                                } else {
 
                                    // This should be fine in the future. But
 
                                    // for now we throw an error as it doesn't
 
                                    // mesh well with the 'fires()' concept.
 
                                    todo!("throw an error of some sort, then fail all related")
 
                                }
 
                            }
 
                        }
 

	
 
                        if can_put {
 
                            // Actually put the message in the outbox
 
                            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
                            let peer_id = port_desc.peer_id;
 
                            let peer_desc = self.registry.ports.get(&peer_id).unwrap();
 
                            debug_assert!(peer_desc.owning_connector_id.is_some());
 

	
 
                            let peer_id = PortId(Id{
 
                                connector_id: peer_desc.owning_connector_id.unwrap(),
 
                                u32_suffix: peer_id
 
                            });
 

	
 
                            // For now this is the one and only time we're going
 
                            // to send a message. So for now we can't send a
 
                            // branch ID.
 
                            desc.global_outbox.insert((port_id, 1), BufferedMessage{
 
                                sending_port: port_id,
 
                                receiving_port: peer_id,
 
                                peer_prev_branch_id: None,
 
                                peer_cur_branch_id: 0,
 
                                message: value_group,
 
                            });
 

	
 
                            // Finally, because we were able to put the message,
 
                            // we can run the branch again
 
                            desc.spec_branches_active.push_back(branch_index);
 
                            call_again = true;
 
                        }
 
                    },
 
                    _ => unreachable!("got result '{:?}' from running component in sync mode", run_result),
 
                }
 
@@ -408,6 +535,213 @@ impl Runtime {
 
        return true;
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
    /// will be created that receive those messages.
 
    fn empty_connector_outbox(&mut self, connector_index: u32) {
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() {
 
            // Lookup the target connector
 
            let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap();
 
            debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id);
 
            let target_connector_id = port_desc.owning_connector_id.unwrap();
 
            let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap();
 

	
 
            // In any case, always put the message in the global inbox
 
            target_connector.global_inbox.insert_message(message_to_send.clone());
 

	
 
            // Check if there are any branches that are waiting on
 
            // receives
 
            if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) {
 
                // Check each of the branches for a port mapping that
 
                // matches the one on the message header
 
                for branch_index in branch_indices {
 
                    let branch = &mut target_connector.branches[*branch_index as usize];
 
                    debug_assert_eq!(branch.branch_state, BranchState::BranchPoint);
 

	
 
                    let mut can_branch = false;
 

	
 
                    if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) {
 
                        if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 {
 
                            can_branch = true;
 
                        }
 
                    }
 

	
 
                    if can_branch {
 
                        // Put the message inside a clone of the currently
 
                        // waiting branch
 
                        let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index);
 
                        let new_branch = &mut target_connector.branches[new_branch_idx as usize];
 
                        let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap();
 
                        new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id);
 
                        new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone());
 

	
 
                        // And queue the branch for further execution
 
                        target_connector.spec_branches_active.push(new_branch_idx);
 
                        if !self.connectors_active.contains(&target_connector.id) {
 
                            self.connectors_active.push_back(target_connector.id);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Checks a connector for the submitted solutions. After all neighbouring
 
    /// connectors have been checked all of their "last checked solution" index
 
    /// will be incremented.
 
    fn check_connector_new_solutions(&mut self, connector_index: u32) {
 
        // Take connector and start processing its solutions
 
        let connector = self.registry.connectors.get_mut(&connector_index).unwrap();
 
        let mut considered_connectors = HashSet::new();
 
        let mut valid_solutions = Vec::new();
 

	
 
        while connector.last_checked_done != connector.spec_branches_done.len() as u32 {
 
            // We have a new solution to consider
 
            let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize];
 
            connector.last_checked_done += 1;
 

	
 
            let branch = &connector.branches[start_branch_index as usize];
 
            debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync);
 

	
 
            // Clear storage for potential solutions
 
            considered_connectors.clear();
 

	
 
            // Start seeking solution among other connectors within the same
 
            // synchronous region
 
            considered_connectors.insert(connector.id);
 
            for port in branch.port_
 
        }
 
    }
 

	
 
    fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) {
 
        // Take the connector and branch of interest
 
        let first_connector = self.registry.connectors.get(&first_connector_index).unwrap();
 
        let first_branch = &first_connector.branches[first_branch_index as usize];
 
        debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync);
 

	
 
        // Setup the first solution
 
        let mut first_solution = ProposedSolution{
 
            connector_mapping: HashMap::new(),
 
            connector_propositions: HashMap::new(),
 
            remaining_connectors: Vec::new(),
 
        };
 
        first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier);
 
        for (port_id, port_mapping) in first_branch.port_mapping.iter() {
 
            let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap();
 
            let peer_port_id = port_desc.peer_id;
 
            let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap();
 
            let peer_connector_id = peer_port_desc.owning_connector_id.unwrap();
 

	
 
            let constraint = match port_mapping.last_registered_identifier {
 
                Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id),
 
                None => ProposedBranchConstraint::SilentPort(peer_port_id),
 
            };
 

	
 
            match first_solution.connector_propositions.entry(peer_connector_id) {
 
                Entry::Vacant(entry) => {
 
                    // Not yet encountered
 
                    entry.insert(vec![constraint]);
 
                    first_solution.remaining_connectors.push(peer_connector_id);
 
                },
 
                Entry::Occupied(mut entry) => {
 
                    // Already encountered
 
                    let entry = entry.get_mut();
 
                    if !entry.contains(&constraint) {
 
                        entry.push(constraint);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Setup storage for all possible solutions
 
        let mut all_solutions = Vec::new();
 
        all_solutions.push(first_solution);
 

	
 
        while !all_solutions.is_empty() {
 
            let mut cur_solution = all_solutions.pop().unwrap();
 

	
 
        }
 
    }
 

	
 
    fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec<ProposedSolution>, target_connector: u32) {
 
        debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited
 
        debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it
 

	
 
        let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap();
 
        let cur_connector = self.registry.connectors.get(&target_connector).unwrap();
 

	
 
        // Make sure all propositions are unique
 
        for i in 0..branch_propositions.len() {
 
            let proposition_i = branch_propositions[i];
 
            for j in 0..i {
 
                let proposition_j = branch_propositions[j];
 
                debug_assert_ne!(proposition_i, proposition_j);
 
            }
 
        }
 

	
 
        // Check connector for compatible branches
 
        let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len());
 
        let mut encountered_propositions = Vec::new();
 

	
 
        'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done {
 
            // Reset the propositions matching variables
 
            encountered_propositions.clear();
 
            encountered_propositions.resize(branch_propositions.len(), false);
 

	
 
            // First check the silent port propositions
 
            let cur_branch = &cur_connector.branches[branch_idx as usize];
 
            for (proposition_idx, proposition) in branch_propositions.iter().enumerate() {
 
                match proposition {
 
                    ProposedBranchConstraint::SilentPort(port_id) => {
 
                        let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id });
 
                        let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap();
 
                        if port_mapping.num_times_fired != 0 {
 
                            // Port did fire, so the current branch is not
 
                            // compatible
 
                            continue 'finished_branch_loop;
 
                        }
 

	
 
                        // Otherwise, the port was silent indeed
 
                        encountered_propositions[proposition_idx] = true;
 
                    },
 
                    ProposedBranchConstraint::BranchNumber(_) => {},
 
                }
 
            }
 

	
 
            // Then check the branch number propositions
 
            let mut parent_branch_idx = branch_idx;
 
            loop {
 
                let branch = &cur_connector.branches[parent_branch_idx as usize];
 
                for proposition_idx in 0..branch_propositions.len() {
 
                    let proposition = branch_propositions[proposition_idx];
 
                    match proposition {
 
                        ProposedBranchConstraint::SilentPort(_) => {},
 
                        ProposedBranchConstraint::BranchNumber(branch_number) => {
 
                            if branch_number == branch.identifier {
 
                                encountered_propositions[proposition_idx] = true;
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if branch.parent_index.is_none() {
 
                    // No more parents
 
                    break;
 
                }
 

	
 
                parent_branch_idx = branch.parent_index.unwrap();
 
            }
 

	
 
            if !encountered_propositions.iter().all(|v| *v) {
 
                // Not all of the constraints were matched
 
                continue 'finished_branch_loop
 
            }
 

	
 
            // All of the constraints on the branch did indeed match.
 
        }
 
    }
 

	
 
    fn generate_connector_id(&mut self) -> u32 {
 
        let id = self.registry.connector_counter;
 
        self.registry.connector_counter += 1;
0 comments (0 inline, 0 general)