Changeset - ecc47971d535
[Not reviewed]
0 5 0
MH - 4 years ago 2021-11-08 15:52:44
contact@maxhenger.nl
WIP on handling sync solution messages
5 files changed with 226 insertions and 61 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector2.rs
Show inline comments
 
@@ -153,25 +153,25 @@ impl ConnectorPDL {
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
 
        todo!("handle content of message?");
 
        self.consensus.handle_received_sync_message(message, ctx);
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::port::{ChannelId, Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
pub(crate) struct GlobalSolution {
 
    branches: Vec<(ConnectorId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
pub(crate) struct Consensus {
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    local_solutions: Vec<LocalSolution>,
 
    solution_combiner: SolutionCombiner,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            local_solutions: Vec::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
@@ -166,36 +171,37 @@ impl Consensus {
 
    /// sync block. To find these branches, they should've been put in the
 
    /// "finished" queue in the execution tree.
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let mut last_branch_id = self.last_finished_handled;
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                target_mapping.push((
 
                    port.port_id,
 
                    port_desc.channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 

	
 
            self.send_or_store_local_solution(local_solution, ctx);
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        final_ports.clear();
 
@@ -241,25 +247,81 @@ impl Consensus {
 
        };
 

	
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
            }
 
        }
 

	
 
        return (sync_header, data_header);
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
    /// Handles a new data message by handling the data and sync header, and
 
    /// checking which *existing* branches *can* receive the message. So two
 
    /// cautionary notes:
 
    /// 1. A future branch might also be able to receive this message, see the
 
    ///     `branch_can_receive` function.
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec<BranchId>) {
 
        self.handle_received_data_header(exec_tree, &message.data_header, target_ids);
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) -> Option<BranchId> {
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
                return None;
 
            },
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                self.send_or_store_local_solution(solution, ctx);
 
                return None;
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 

	
 
        self.encountered_peers.push(sync_header.sending_component_id);
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for encountered_id in self.encountered_peers.iter() {
 
                if encountered_id == sync_header.sending_component_id {
 
                    // Don't need to send it to this one
 
                    continue
 
@@ -278,43 +340,24 @@ impl Consensus {
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    ///
 
    /// This function is generally called for freshly received messages that
 
    /// should be matched against previously halted branches.
 
    /// TODO: Rename, name confused me after a day
 
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
                mapping.registered_id = Some(data_header.new_mapping);
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
@@ -347,113 +390,105 @@ impl Consensus {
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            self.store_local_solution(solution, ctx);
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 

	
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 

	
 
    /// Stores the local solution internally. This assumes that we are the
 
    /// leader.
 
    fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) {
 
        debug_assert_eq!(self.highest_connector_id, _ctx.id);
 

	
 
        self.local_solutions.push(solution);
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if !self.local_solutions.is_empty() {
 
            for local_solution in self.local_solutions.drain(..) {
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: self.highest_connector_id,
 
                    content: SyncContent::LocalSolution(local_solution),
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 
        for local_solution in self.solution_combiner.drain() {
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(local_solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_ports: Vec<PortIdLocal>,
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
pub(crate) struct GlobalSolution {
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>
 
}
 

	
 
impl GlobalSolution {
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution(&mut self, solution: LocalSolution) {
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
@@ -497,57 +532,57 @@ impl GlobalSolution {
 
                        if cur_port_id == other_port_id {
 
                            // We have a shared port
 
                            matching_ports.push(*port_id);
 
                        }
 
                    }
 
                }
 

	
 
                if !matching_ports.is_empty() {
 
                    // We share some ports
 
                    component_peers.push(ComponentPeer{
 
                        target_id: other_component.component,
 
                        target_index: other_index,
 
                        involved_ports: matching_ports,
 
                        involved_channels: matching_ports,
 
                    });
 
                }
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in component_peers {
 
                num_ports_in_peers += peer.involved_ports.len();
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
            if num_ports_in_peers == cur_ports.len() {
 
                // Newly added component has all required peers present
 
                self.local[component_index].all_peers_present = true;
 
            }
 

	
 
            // Add the found component pairing entries to the solution entries
 
            // for the two involved components
 
            for component_match in component_peers {
 
                // Check the other component for having all peers present
 
                let mut num_ports_in_peers = component_match.involved_ports.len();
 
                let mut num_ports_in_peers = component_match.involved_channels.len();
 
                let other_component = &mut self.local[component_match.target_index];
 
                for existing_peer in &other_component.peers {
 
                    num_ports_in_peers += existing_peer.involved_ports.len();
 
                    num_ports_in_peers += existing_peer.involved_channels.len();
 
                }
 

	
 
                if num_ports_in_peers == other_component.solutions[0].port_mapping.len() {
 
                    other_component.all_peers_present = true;
 
                }
 

	
 
                other_component.peers.push(ComponentPeer{
 
                    target_id: component_id,
 
                    target_index: component_index,
 
                    involved_ports: component_match.involved_ports.clone(),
 
                    involved_channels: component_match.involved_channels.clone(),
 
                });
 

	
 
                let new_component = &mut self.local[component_index];
 
                new_component.peers.push(component_match);
 
            }
 
        }
 

	
 
        // We're now sure that we know which other components the currently
 
        // considered component is linked up to. Now we need to check those
 
        // entries (if any) to see if any pair of local solutions match
 
        let mut new_component_matches = Vec::new();
 
        let cur_component = &self.local[component_index];
 
@@ -626,43 +661,157 @@ impl GlobalSolution {
 
            {
 
                Some(other_match) => {
 
                    // Already have an entry
 
                    debug_assert_eq!(other_match.target_index, new_component_match.target_index);
 
                    other_match.match_indices.extend(&new_component_match.match_indices);
 
                },
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    fn check_new_solution(&self, component_idx: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        if !self.can_have_solution() {
 
            return None;
 
        }
 

	
 
        // By now we're certain that all peers are present. So once our
 
        // backtracking solution stack is as long as the number of components,
 
        // then we have found a global solution.
 
        let mut check_stack = Vec::new();
 
        let mut check_from = 0;
 
        check_stack.push((component_index, solution_index));
 
        'checking_loop: while check_from < check_stack.len() {
 
            // Prepare for next iteration
 
            let new_check_from = check_stack.len();
 

	
 
            // Go through all entries on the checking stack. Each entry
 
            // corresponds to a component's solution. We check that one against
 
            // previously added ones on the stack, and if they're not already
 
            // added we push them onto the check stack.
 
            for check_idx in check_from..new_check_from {
 
                // Take the current solution
 
                let (component_index, solution_index) = check_stack[check_idx];
 
                debug_assert!(!self.local[component_index].solutions.is_empty());
 
                let cur_solution = &self.local[component_index].solutions[solution_index];
 

	
 
                // Go through the matches and check if they're on the stack or
 
                // should be added to the stack.
 
                for cur_match in &cur_solution.matches {
 
                    let mut is_already_on_stack = false;
 
                    let mut has_same_solution = false;
 
                    for existing_check_idx in 0..check_from {
 
                        let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx];
 
                        if existing_component_index == cur_match.target_index {
 
                            // Already lives on the stack, so the match MUST
 
                            // contain the same solution index if the checked
 
                            // local solution is agreeable with the (partially
 
                            // determined) global solution.
 
                            is_already_on_stack = true;
 
                            if cur_match.match_indices.contains(&existing_solution_index) {
 
                                has_same_solution = true;
 
                                break;
 
                            }
 
                        }
 
                    }
 

	
 
                    if is_already_on_stack {
 
                        if !has_same_solution {
 
                            // We have an inconsistency, so we need to go back
 
                            // in our stack, and try the next solution
 
                            let (last_component_index, last_solution_index) = check_stack[check_from];
 
                            check_stack.truncate(check_from);
 
                            if check_stack.is_empty() {
 
                                // The starting point does not yield a valid
 
                                // solution
 
                                return None;
 
                            }
 

	
 
                            // Try the next one
 
                            let last_component = &self.local[last_component_index];
 
                            let new_solution_index = last_solution_index + 1;
 
                            if new_solution_index >= last_component.solutions.len() {
 
                                // No more things to try, again: no valid
 
                                // solution
 
                                return None;
 
                            }
 

	
 
                            check_stack.push((last_component_index, new_solution_index));
 
                            continue 'checking_loop;
 
                        } // else: we're fine, the solution is agreeable
 
                    } else {
 
                        check_stack.push((cur_match.target_index, 0))
 
                    }
 
                }
 
            }
 

	
 
            check_from = new_check_from;
 
        }
 

	
 
        // Because of our earlier checking if we can have a solution at
 
        // all (all components have a peer), and the exit condition of the while
 
        // loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut global_solution = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
            global_solution.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        return Some(global_solution);
 
    }
 

	
 
    /// Simple test if a solution is at all possible. If this returns true it
 
    /// does not mean there actually is a solution.
 
    fn can_have_solution(&self) -> bool {
 
        for component in &self.local {
 
            if !component.all_peers_present {
 
                return false;
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution back into local
 
    /// solutions to ship to another component.
 
    // TODO: Don't do this, kind of wasteful since a lot of processing has
 
    //  already been performed.
 
    fn drain(&mut self) -> Vec<LocalSolution> {
 
        let mut reserve_len = 0;
 
        for component in &self.local {
 
            reserve_len += component.solutions.len();
 
        }
 

	
 
        let mut solutions = Vec::with_capacity(reserve_len);
 
        for component in self.local.drain(..) {
 
            for solution in component.solutions {
 
                solutions.push(LocalSolution{
 
                    component: component.component,
 
                    final_branch_id: solution.final_branch_id,
 
                    port_mapping: solution.port_mapping,
 
                });
 
            }
 
        }
 

	
 
        return solutions;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic Helpers
 
// -----------------------------------------------------------------------------
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
src/runtime2/inbox2.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::BranchId;
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::consensus::LocalSolution;
 
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
    pub registered_id: Option<BranchId>,
 
    pub expected_firing: Option<bool>,
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
@@ -32,25 +32,26 @@ pub(crate) struct DataHeader {
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 
    LocalSolution(LocalSolution), // sending a local solution to the leader
 
    Notification, // just a notification (so message is about sending the SyncHeader)
 
    GlobalSolution(GlobalSolution), // broadcasting to everyone
 
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm.
 
#[derive(Debug)]
 
pub(crate) struct SyncMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub target_component_id: ConnectorId,
 
    pub content: SyncContent,
 
}
 

	
 
/// A control message is a message intended for the scheduler that is executing
src/runtime2/mod.rs
Show inline comments
 
@@ -20,25 +20,25 @@ use std::collections::VecDeque;
 
use std::sync::{Arc, Condvar, Mutex, RwLock};
 
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::inbox2::MessageFancy;
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::port::{ChannelId, Port, PortState};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
@@ -213,37 +213,40 @@ impl RuntimeInner {
 
        lock.push_back(key);
 
        self.scheduler_notifier.notify_one();
 
    }
 

	
 
    // --- Creating/using ports
 

	
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let channel_id = ChannelId::new(getter_id);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            channel_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            channel_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    /// Sends a message to a particular connector. If the connector happened to
 
    /// be sleeping then it will be scheduled for execution.
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: MessageFancy) {
 
        let target = self.get_component_public(target_id);
src/runtime2/port.rs
Show inline comments
 
@@ -12,44 +12,56 @@ impl PortIdLocal {
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct ChannelId {
 
    pub index: u32,
 
}
 

	
 
impl ChannelId {
 
    pub fn new(id: u32) -> Self {
 
        return Self{ index: id };
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
/// Represents a port inside of the runtime. This is generally the local view of
 
/// a connector on its port, which may not be consistent with the rest of the
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
#[derive(Clone)]
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub channel_id: ChannelId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)