Changeset - 7662b8fb871d
[Not reviewed]
0 4 0
mh - 4 years ago 2021-11-10 19:22:54
contact@maxhenger.nl
Rewrite solution composition, add some tests
4 files changed with 293 insertions and 105 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -142,22 +142,25 @@ impl ConnectorPDL {
 

	
 
    pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        let mut branches = Vec::new(); // TODO: @Remove
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);
 
        if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        for branch_id in branches.drain(..) {
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
@@ -235,22 +238,22 @@ impl ConnectorPDL {
 

	
 
                    // Note: we only know that a branch is waiting on a message when
 
                    // it reaches the `get` call. But we might have already received
 
                    // a message that targets this branch, so check now.
 
                    let mut any_branch_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_branch_received = true;
 
                        }
 
                    }
 

	
src/runtime2/consensus.rs
Show inline comments
 
@@ -30,12 +30,18 @@ pub(crate) struct GlobalSolution {
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
@@ -47,17 +53,17 @@ pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state from communication
 
    encountered_peers: VecSet<ConnectorId>, // to determine when we should send "found a higher ID" messages.
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    // --- Persistent state
 
    // TODO: Tracking sync round numbers
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
@@ -68,15 +74,16 @@ pub(crate) enum Consistency {
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
@@ -96,13 +103,12 @@ impl Consensus {
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
        debug_assert!(self.encountered_peers.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ctx.get_ports().iter()
 
@@ -203,12 +209,13 @@ impl Consensus {
 

	
 
                if !self.encountered_ports.contains(&port.port_id) {
 
                    ctx.submit_message(Message::Data(DataMessage {
 
                        sync_header: SyncHeader{
 
                            sending_component_id: ctx.id,
 
                            highest_component_id: self.highest_connector_id,
 
                            sync_round: self.sync_round
 
                        },
 
                        data_header: DataHeader{
 
                            expected_mapping: source_mapping.clone(),
 
                            sending_port: port.port_id,
 
                            target_port: peer_port_id,
 
                            new_mapping: BranchId::new_invalid(),
 
@@ -254,15 +261,21 @@ impl Consensus {
 
        }
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.last_finished_handled = None;
 
        self.encountered_peers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 

	
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
@@ -315,22 +328,24 @@ impl Consensus {
 
    /// checking which *existing* branches *can* receive the message. So two
 
    /// cautionary notes:
 
    /// 1. A future branch might also be able to receive this message, see the
 
    ///     `branch_can_receive` function.
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec<BranchId>) {
 
        self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids);
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec<BranchId>) -> bool {
 
        self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids);
 
        return self.handle_received_sync_header(&message.sync_header, ctx)
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return None;
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
@@ -348,14 +363,14 @@ impl Consensus {
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header, content));
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) {
 
        debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content));
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 
@@ -376,13 +391,19 @@ impl Consensus {
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool {
 
        if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) {
 
            if sync_header.sync_round < peer.expected_sync_round {
 
                return false;
 
            }
 
        }
 

	
 
        if let DataContent::SilentPortNotification = content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
@@ -405,42 +426,44 @@ impl Consensus {
 

	
 
    // --- Internal helpers
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
 
    fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header, content) {
 
                if self.branch_can_receive(branch.id, sync_header, data_header, content) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) {
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 

	
 
        self.encountered_peers.push(sync_header.sending_component_id);
 
        if !self.handle_peer(sync_header) {
 
            // We can drop this package
 
            return false;
 
        }
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for encountered_id in self.encountered_peers.iter() {
 
                if *encountered_id == sync_header.sending_component_id {
 
            for peer in self.peers.iter() {
 
                if peer.id == sync_header.sending_component_id || !peer.encountered_this_round {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncMessage {
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: *encountered_id,
 
                    target_component_id: peer.id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(Message::Sync(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
@@ -452,12 +475,41 @@ impl Consensus {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(Message::Sync(message));
 
        } // else: exactly equal, so do nothing
 

	
 
        return true;
 
    }
 

	
 
    /// Handles a (potentially new) peer. Returns `false` if the provided sync
 
    /// number is different then the expected one.
 
    fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool {
 
        let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id);
 
        match position {
 
            Some(index) => {
 
                let entry = &mut self.peers[index];
 
                entry.encountered_this_round = true;
 
                // TODO: Proper handling of potential overflow
 
                if sync_header.sync_round >= entry.expected_sync_round {
 
                    entry.expected_sync_round = sync_header.sync_round;
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return true;
 
            }
 
        }
 
    }
 

	
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option<BranchId> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 
@@ -496,12 +548,13 @@ impl Consensus {
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
            sync_round: self.sync_round,
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
@@ -517,42 +570,56 @@ impl Consensus {
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchId)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>
 
}
 

	
 
struct CheckEntry {
 
    component_index: usize,         // component index in combiner's vector
 
    solution_index: usize,          // solution entry in the above component entry
 
    parent_entry_index: usize,      // parent that caused the creation of this checking entry
 
    match_index_in_parent: usize,   // index in the matches array of the parent
 
    solution_index_in_parent: usize,// index in the solution array of the match entry in the parent
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 
@@ -754,115 +821,155 @@ impl SolutionCombiner {
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<GlobalSolution> {
 
    // TODO: At some point, check if divide and conquer is faster?
 
    fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option<GlobalSolution> {
 
        if !self.can_have_solution() {
 
            return None;
 
        }
 

	
 
        // By now we're certain that all peers are present. So once our
 
        // backtracking solution stack is as long as the number of components,
 
        // then we have found a global solution.
 
        let mut check_stack = Vec::new();
 
        let mut check_from = 0;
 
        check_stack.push((component_index, solution_index));
 
        'checking_loop: while check_from < check_stack.len() {
 
            // Prepare for next iteration
 
            let new_check_from = check_stack.len();
 

	
 
            // Go through all entries on the checking stack. Each entry
 
            // corresponds to a component's solution. We check that one against
 
            // previously added ones on the stack, and if they're not already
 
            // added we push them onto the check stack.
 
            for check_idx in check_from..new_check_from {
 
                // Take the current solution
 
                let (component_index, solution_index) = check_stack[check_idx];
 
                debug_assert!(!self.local[component_index].solutions.is_empty());
 
                let cur_solution = &self.local[component_index].solutions[solution_index];
 

	
 
                // Go through the matches and check if they're on the stack or
 
                // should be added to the stack.
 
                for cur_match in &cur_solution.matches {
 
                    let mut is_already_on_stack = false;
 
                    let mut has_same_solution = false;
 
                    for existing_check_idx in 0..check_from {
 
                        let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx];
 
                        if existing_component_index == cur_match.target_index {
 
                            // Already lives on the stack, so the match MUST
 
                            // contain the same solution index if the checked
 
                            // local solution is agreeable with the (partially
 
                            // determined) global solution.
 
                            is_already_on_stack = true;
 
                            if cur_match.match_indices.contains(&existing_solution_index) {
 
                                has_same_solution = true;
 
                                break;
 
                            }
 
        // Construct initial entry on stack
 
        let mut stack = Vec::with_capacity(self.local.len());
 
        stack.push(CheckEntry{
 
            component_index: initial_component_index,
 
            solution_index: initial_solution_index,
 
            parent_entry_index: 0,
 
            match_index_in_parent: 0,
 
            solution_index_in_parent: 0,
 
        });
 

	
 
        'check_last_stack: loop {
 
            let cur_index = stack.len() - 1;
 
            let cur_entry = &stack[cur_index];
 

	
 
            // Check if the current component is matching with all other entries
 
            let mut all_match = true;
 
            'check_against_existing: for prev_index in 0..cur_index {
 
                let prev_entry = &stack[prev_index];
 
                let prev_component = &self.local[prev_entry.component_index];
 
                let prev_solution = &prev_component.solutions[prev_entry.solution_index];
 

	
 
                for prev_matching_component in &prev_solution.matches {
 
                    if prev_matching_component.target_index == cur_entry.component_index {
 
                        // Previous entry has shared ports with the current
 
                        // entry, so see if we have a composable pair of
 
                        // solutions.
 
                        if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) {
 
                            all_match = false;
 
                            break 'check_against_existing;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
                    if is_already_on_stack {
 
                        if !has_same_solution {
 
                            // We have an inconsistency, so we need to go back
 
                            // in our stack, and try the next solution
 
                            let (last_component_index, last_solution_index) = check_stack[check_from];
 
                            check_stack.truncate(check_from);
 
                            if check_stack.is_empty() {
 
                                // The starting point does not yield a valid
 
                                // solution
 
                                return None;
 
                            }
 
            if all_match {
 
                // All components matched until now.
 
                if stack.len() == self.local.len() {
 
                    // We have found a global solution
 
                    break 'check_last_stack;
 
                }
 

	
 
                            // Try the next one
 
                            let last_component = &self.local[last_component_index];
 
                            let new_solution_index = last_solution_index + 1;
 
                            if new_solution_index >= last_component.solutions.len() {
 
                                // No more things to try, again: no valid
 
                                // solution
 
                                return None;
 
                            }
 
                // Not all components found yet, look for a new one that has not
 
                // yet been added yet.
 
                for (parent_index, parent_entry) in stack.iter().enumerate() {
 
                    let parent_component = &self.local[parent_entry.component_index];
 
                    let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                            check_stack.push((last_component_index, new_solution_index));
 
                            continue 'checking_loop;
 
                        } // else: we're fine, the solution is agreeable
 
                    } else {
 
                        check_stack.push((cur_match.target_index, 0))
 
                    for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() {
 
                        if peer_component.match_indices.is_empty() {
 
                            continue;
 
                        }
 

	
 
                        let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index);
 
                        if !already_added {
 
                            // New component to try
 
                            stack.push(CheckEntry{
 
                                component_index: peer_component.target_index,
 
                                solution_index: peer_component.match_indices[0],
 
                                parent_entry_index: parent_index,
 
                                match_index_in_parent: peer_index,
 
                                solution_index_in_parent: 0,
 
                            });
 
                            continue 'check_last_stack;
 
                        }
 
                    }
 
                }
 

	
 
                // Cannot find a peer to add. This is possible if, for example,
 
                // we have a component A which has the only connection to
 
                // component B. And B has sent a local solution saying it is
 
                // finished, but the last data message has not yet arrived at A.
 

	
 
                // In any case, we just exit the if statement and handle not
 
                // being able to find a new connector as being forced to try a
 
                // new permutation of possible local solutions.
 
            }
 

	
 
            // Either the currently considered local solution is inconsistent
 
            // with other local solutions, or we cannot find a new component to
 
            // add. This is where we perform backtracking as long as needed to
 
            // try a new solution.
 
            while stack.len() > 1 {
 
                // Check if our parent has another solution we can try
 
                let cur_index = stack.len() - 1;
 
                let cur_entry = &stack[cur_index];
 

	
 
                let parent_entry = &stack[cur_entry.parent_entry_index];
 
                let parent_component = &self.local[parent_entry.component_index];
 
                let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                let match_component = &parent_solution.matches[cur_entry.match_index_in_parent];
 
                debug_assert!(match_component.target_index == cur_entry.component_index);
 
                let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1;
 

	
 
                if new_solution_index_in_parent < match_component.match_indices.len() {
 
                    // We can still try a new one
 
                    let new_solution_index = match_component.match_indices[new_solution_index_in_parent];
 
                    let cur_entry = &mut stack[cur_index];
 
                    cur_entry.solution_index_in_parent = new_solution_index_in_parent;
 
                    cur_entry.solution_index = new_solution_index;
 
                    continue 'check_last_stack;
 
                } else {
 
                    // We're out of options here. So pop an entry, then in
 
                    // the next iteration of this backtracking loop we try
 
                    // to increment that solution
 
                    stack.pop();
 
                }
 
            }
 

	
 
            check_from = new_check_from;
 
            // Stack length is 1, hence we're back at our initial solution.
 
            // Since that doesn't yield a global solution, we simply:
 
            return None;
 
        }
 

	
 
        // Because of our earlier checking if we can have a solution at
 
        // all (all components have their peers), and the exit condition of the
 
        // while loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut final_branches = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
        // Constructing the representation of the global solution
 
        debug_assert_eq!(stack.len(), self.local.len());
 
        let mut final_branches = Vec::with_capacity(stack.len());
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 
            final_branches.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_channels = 0;
 
        for (component_index, _) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            total_num_channels += component.solutions[0].channel_mapping.len();
 
        }
 

	
 
        total_num_channels /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_channels);
 
        let mut total_num_checked = 0;
 

	
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 

	
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(*encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
src/runtime2/inbox.rs
Show inline comments
 
@@ -19,12 +19,13 @@ pub(crate) struct PortAnnotation {
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
    pub sending_component_id: ConnectorId,
 
    pub highest_component_id: ConnectorId,
 
    pub sync_round: u32,
 
}
 

	
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
src/runtime2/tests/mod.rs
Show inline comments
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 1;  // number of threads in runtime
 
const NUM_INSTANCES: u32 = 1;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 1500;      // number of loops within a single test (not used by all tests)
 
const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
const NUM_INSTANCES: u32 = 5;   // number of test instances constructed
 
const NUM_LOOPS: u32 = 5;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
@@ -105,26 +105,26 @@ fn test_star_shaped_request() {
 

	
 
    primitive center(out<u32>[] requests, in<u32>[] responses, u32 loops) {
 
        u32 loop_index = 0;
 
        auto num_edges = length(requests);
 

	
 
        while (loop_index < loops) {
 
            print(\"starting loop\");
 
            // print(\"starting loop\");
 
            synchronous {
 
                u32 edge_index = 0;
 
                u32 sum = 0;
 
                while (edge_index < num_edges) {
 
                    put(requests[edge_index], edge_index);
 
                    auto response = get(responses[edge_index]);
 
                    sum += response;
 
                    edge_index += 1;
 
                }
 

	
 
                assert(sum == num_edges * (num_edges - 1));
 
            }
 
            print(\"ending loop\");
 
            // print(\"ending loop\");
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_edges, u32 num_loops) {
 
        auto requests = {};
 
@@ -149,7 +149,84 @@ fn test_star_shaped_request() {
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS),
 
        ]));
 
    });
 
}
 

	
 
#[test]
 
fn test_conga_line_request() {
 
    const CODE: &'static str = "
 
    primitive start(out<u32> req, in<u32> resp, u32 num_nodes, u32 num_loops) {
 
        u32 loop_index = 0;
 
        u32 initial_value = 1337;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                put(req, initial_value);
 
                auto result = get(resp);
 
                assert(result == initial_value + num_nodes * 2);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive middle(
 
        in<u32> req_in, out<u32> req_forward,
 
        in<u32> resp_in, out<u32> resp_forward,
 
        u32 num_loops
 
    ) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(req_forward, req + 1);
 
                auto resp = get(resp_in);
 
                put(resp_forward, resp + 1);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    primitive end(in<u32> req_in, out<u32> resp_out, u32 num_loops) {
 
        u32 loop_index = 0;
 
        while (loop_index < num_loops) {
 
            synchronous {
 
                auto req = get(req_in);
 
                put(resp_out, req);
 
            }
 
            loop_index += 1;
 
        }
 
    }
 

	
 
    composite constructor(u32 num_nodes, u32 num_loops) {
 
        channel initial_req -> req_in;
 
        channel resp_out -> final_resp;
 
        new start(initial_req, final_resp, num_nodes, num_loops);
 

	
 
        in<u32> last_req_in = req_in;
 
        out<u32> last_resp_out = resp_out;
 

	
 
        u32 node = 0;
 
        while (node < num_nodes) {
 
            channel new_req_fw -> new_req_in;
 
            channel new_resp_out -> new_resp_in;
 
            new middle(last_req_in, new_req_fw, new_resp_in, last_resp_out, num_loops);
 

	
 
            last_req_in = new_req_in;
 
            last_resp_out = new_resp_out;
 

	
 
            node += 1;
 
        }
 

	
 
        new end(last_req_in, last_resp_out, num_loops);
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("conga_line_request");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
            Value::UInt32(5),
 
            Value::UInt32(NUM_LOOPS)
 
        ]));
 
    });
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)