Changeset - ecc47971d535
[Not reviewed]
0 5 0
MH - 4 years ago 2021-11-08 15:52:44
contact@maxhenger.nl
WIP on handling sync solution messages
5 files changed with 226 insertions and 61 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector2.rs
Show inline comments
 
@@ -159,13 +159,13 @@ impl ConnectorPDL {
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
 
        todo!("handle content of message?");
 
        self.consensus.handle_received_sync_message(message, ctx);
 
    }
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::port::{ChannelId, Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
pub(crate) struct GlobalSolution {
 
    branches: Vec<(ConnectorId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
@@ -38,13 +43,13 @@ pub(crate) struct Consensus {
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    local_solutions: Vec<LocalSolution>,
 
    solution_combiner: SolutionCombiner,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
@@ -56,13 +61,13 @@ impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            local_solutions: Vec::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
@@ -172,24 +177,25 @@ impl Consensus {
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                target_mapping.push((
 
                    port.port_id,
 
                    port_desc.channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 

	
 
            self.send_or_store_local_solution(local_solution, ctx);
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
    }
 
@@ -247,13 +253,69 @@ impl Consensus {
 
            }
 
        }
 

	
 
        return (sync_header, data_header);
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
    /// Handles a new data message by handling the data and sync header, and
 
    /// checking which *existing* branches *can* receive the message. So two
 
    /// cautionary notes:
 
    /// 1. A future branch might also be able to receive this message, see the
 
    ///     `branch_can_receive` function.
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec<BranchId>) {
 
        self.handle_received_data_header(exec_tree, &message.data_header, target_ids);
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) -> Option<BranchId> {
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 
        match message.content {
 
            SyncContent::Notification => {
 
                // We were just interested in the header
 
                return None;
 
            },
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                self.send_or_store_local_solution(solution, ctx);
 
                return None;
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
        }
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 

	
 
        self.encountered_peers.push(sync_header.sending_component_id);
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID. So should be the target of our
 
@@ -284,31 +346,12 @@ impl Consensus {
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    ///
 
    /// This function is generally called for freshly received messages that
 
    /// should be matched against previously halted branches.
 
    /// TODO: Rename, name confused me after a day
 
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
                // Found the port in which the message should be inserted
 
@@ -353,101 +396,93 @@ impl Consensus {
 
    }
 

	
 
    // --- Internal helpers
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            self.store_local_solution(solution, ctx);
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 

	
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 

	
 
    /// Stores the local solution internally. This assumes that we are the
 
    /// leader.
 
    fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) {
 
        debug_assert_eq!(self.highest_connector_id, _ctx.id);
 

	
 
        self.local_solutions.push(solution);
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if !self.local_solutions.is_empty() {
 
            for local_solution in self.local_solutions.drain(..) {
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: self.highest_connector_id,
 
                    content: SyncContent::LocalSolution(local_solution),
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 
        for local_solution in self.solution_combiner.drain() {
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(local_solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_ports: Vec<PortIdLocal>,
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
pub(crate) struct GlobalSolution {
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>
 
}
 

	
 
impl GlobalSolution {
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution(&mut self, solution: LocalSolution) {
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 
@@ -503,45 +538,45 @@ impl GlobalSolution {
 

	
 
                if !matching_ports.is_empty() {
 
                    // We share some ports
 
                    component_peers.push(ComponentPeer{
 
                        target_id: other_component.component,
 
                        target_index: other_index,
 
                        involved_ports: matching_ports,
 
                        involved_channels: matching_ports,
 
                    });
 
                }
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in component_peers {
 
                num_ports_in_peers += peer.involved_ports.len();
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
            if num_ports_in_peers == cur_ports.len() {
 
                // Newly added component has all required peers present
 
                self.local[component_index].all_peers_present = true;
 
            }
 

	
 
            // Add the found component pairing entries to the solution entries
 
            // for the two involved components
 
            for component_match in component_peers {
 
                // Check the other component for having all peers present
 
                let mut num_ports_in_peers = component_match.involved_ports.len();
 
                let mut num_ports_in_peers = component_match.involved_channels.len();
 
                let other_component = &mut self.local[component_match.target_index];
 
                for existing_peer in &other_component.peers {
 
                    num_ports_in_peers += existing_peer.involved_ports.len();
 
                    num_ports_in_peers += existing_peer.involved_channels.len();
 
                }
 

	
 
                if num_ports_in_peers == other_component.solutions[0].port_mapping.len() {
 
                    other_component.all_peers_present = true;
 
                }
 

	
 
                other_component.peers.push(ComponentPeer{
 
                    target_id: component_id,
 
                    target_index: component_index,
 
                    involved_ports: component_match.involved_ports.clone(),
 
                    involved_channels: component_match.involved_channels.clone(),
 
                });
 

	
 
                let new_component = &mut self.local[component_index];
 
                new_component.peers.push(component_match);
 
            }
 
        }
 
@@ -632,18 +667,108 @@ impl GlobalSolution {
 
                None => {
 
                    // Create a new entry
 
                    cur_solution.matches.push(new_component_match);
 
                }
 
            }
 
        }
 

	
 
        return self.check_new_solution(component_index, solution_index);
 
    }
 

	
 
    /// Checks if, starting at the provided local solution, a global solution
 
    /// can be formed.
 
    fn check_new_solution(&self, component_idx: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        
 
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<Vec<(ConnectorId, BranchId)>> {
 
        if !self.can_have_solution() {
 
            return None;
 
        }
 

	
 
        // By now we're certain that all peers are present. So once our
 
        // backtracking solution stack is as long as the number of components,
 
        // then we have found a global solution.
 
        let mut check_stack = Vec::new();
 
        let mut check_from = 0;
 
        check_stack.push((component_index, solution_index));
 
        'checking_loop: while check_from < check_stack.len() {
 
            // Prepare for next iteration
 
            let new_check_from = check_stack.len();
 

	
 
            // Go through all entries on the checking stack. Each entry
 
            // corresponds to a component's solution. We check that one against
 
            // previously added ones on the stack, and if they're not already
 
            // added we push them onto the check stack.
 
            for check_idx in check_from..new_check_from {
 
                // Take the current solution
 
                let (component_index, solution_index) = check_stack[check_idx];
 
                debug_assert!(!self.local[component_index].solutions.is_empty());
 
                let cur_solution = &self.local[component_index].solutions[solution_index];
 

	
 
                // Go through the matches and check if they're on the stack or
 
                // should be added to the stack.
 
                for cur_match in &cur_solution.matches {
 
                    let mut is_already_on_stack = false;
 
                    let mut has_same_solution = false;
 
                    for existing_check_idx in 0..check_from {
 
                        let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx];
 
                        if existing_component_index == cur_match.target_index {
 
                            // Already lives on the stack, so the match MUST
 
                            // contain the same solution index if the checked
 
                            // local solution is agreeable with the (partially
 
                            // determined) global solution.
 
                            is_already_on_stack = true;
 
                            if cur_match.match_indices.contains(&existing_solution_index) {
 
                                has_same_solution = true;
 
                                break;
 
                            }
 
                        }
 
                    }
 

	
 
                    if is_already_on_stack {
 
                        if !has_same_solution {
 
                            // We have an inconsistency, so we need to go back
 
                            // in our stack, and try the next solution
 
                            let (last_component_index, last_solution_index) = check_stack[check_from];
 
                            check_stack.truncate(check_from);
 
                            if check_stack.is_empty() {
 
                                // The starting point does not yield a valid
 
                                // solution
 
                                return None;
 
                            }
 

	
 
                            // Try the next one
 
                            let last_component = &self.local[last_component_index];
 
                            let new_solution_index = last_solution_index + 1;
 
                            if new_solution_index >= last_component.solutions.len() {
 
                                // No more things to try, again: no valid
 
                                // solution
 
                                return None;
 
                            }
 

	
 
                            check_stack.push((last_component_index, new_solution_index));
 
                            continue 'checking_loop;
 
                        } // else: we're fine, the solution is agreeable
 
                    } else {
 
                        check_stack.push((cur_match.target_index, 0))
 
                    }
 
                }
 
            }
 

	
 
            check_from = new_check_from;
 
        }
 

	
 
        // Because of our earlier checking if we can have a solution at
 
        // all (all components have a peer), and the exit condition of the while
 
        // loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut global_solution = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
            global_solution.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        return Some(global_solution);
 
    }
 

	
 
    /// Simple test if a solution is at all possible. If this returns true it
 
    /// does not mean there actually is a solution.
 
    fn can_have_solution(&self) -> bool {
 
        for component in &self.local {
 
@@ -651,12 +776,36 @@ impl GlobalSolution {
 
                return false;
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution back into local
 
    /// solutions to ship to another component.
 
    // TODO: Don't do this, kind of wasteful since a lot of processing has
 
    //  already been performed.
 
    fn drain(&mut self) -> Vec<LocalSolution> {
 
        let mut reserve_len = 0;
 
        for component in &self.local {
 
            reserve_len += component.solutions.len();
 
        }
 

	
 
        let mut solutions = Vec::with_capacity(reserve_len);
 
        for component in self.local.drain(..) {
 
            for solution in component.solutions {
 
                solutions.push(LocalSolution{
 
                    component: component.component,
 
                    final_branch_id: solution.final_branch_id,
 
                    port_mapping: solution.port_mapping,
 
                });
 
            }
 
        }
 

	
 
        return solutions;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic Helpers
 
// -----------------------------------------------------------------------------
 

	
src/runtime2/inbox2.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::BranchId;
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::consensus::LocalSolution;
 
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
@@ -38,13 +38,14 @@ pub(crate) struct DataMessageFancy {
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 
    LocalSolution(LocalSolution), // sending a local solution to the leader
 
    Notification, // just a notification (so message is about sending the SyncHeader)
 
    GlobalSolution(GlobalSolution), // broadcasting to everyone
 
    Notification, // just a notification (so purpose of message is to send the SyncHeader)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm.
 
#[derive(Debug)]
 
pub(crate) struct SyncMessageFancy {
src/runtime2/mod.rs
Show inline comments
 
@@ -26,13 +26,13 @@ use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::inbox2::MessageFancy;
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::port::{ChannelId, Port, PortState};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
@@ -219,25 +219,28 @@ impl RuntimeInner {
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let channel_id = ChannelId::new(getter_id);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            channel_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            channel_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
src/runtime2/port.rs
Show inline comments
 
@@ -18,12 +18,23 @@ impl PortIdLocal {
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub struct ChannelId {
 
    pub index: u32,
 
}
 

	
 
impl ChannelId {
 
    pub fn new(id: u32) -> Self {
 
        return Self{ index: id };
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
@@ -38,12 +49,13 @@ pub enum PortState {
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
#[derive(Clone)]
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub channel_id: ChannelId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
0 comments (0 inline, 0 general)