Changeset - d98661a2215e
[Not reviewed]
0 3 0
mh - 4 years ago 2021-11-07 22:45:20
contact@maxhenger.nl
negligable update to consensus algorithm
3 files changed with 29 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod raw_vec;
 

	
 
// TODO: Finish this later, use alloc::alloc and alloc::Layout
 
// mod freelist;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::DequeSet;
 
pub(crate) use sets::{DequeSet, VecSet};
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, SyncHeader};
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
}
 

	
 
pub(crate) struct GlobalSolution {
 

	
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
pub(crate) struct Consensus {
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
@@ -211,25 +224,28 @@ impl Consensus {
 

	
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch_id);
 
            }
 
        }
 

	
 
        return (sync_header, data_header);
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        todo!("should check IDs and maybe send sync messages");
 
        debug_assert!(sync_header.sending_component_id != ctx.id)
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID
 
        }
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    ///
 
    /// This function is generally called for freshly received messages that
 
    /// should be matched against previously halted branches.
 
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
@@ -259,42 +275,48 @@ impl Consensus {
 

	
 
                return;
 
            }
 
        }
 

	
 
        // If here, then the branch didn't actually own the port? Means the
 
        // caller made a mistake
 
        unreachable!("incorrect notify_of_received_message");
 
    }
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
            // annotation, check if the current mapping matches
 
            for current in &annotation.port_mapping {
 
                if expected.port_id == current.port_id {
 
                    if expected.registered_id != current.registered_id {
 
                        // IDs do not match, we cannot receive the
 
                        // message in this branch
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn forward_solutions_to(&mut self, target: ConnectorId) {
 
        todo!("write")
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
src/runtime2/inbox2.rs
Show inline comments
 
@@ -20,42 +20,44 @@ pub(crate) struct SyncHeader {
 
}
 

	
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchId,
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algrorithm
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 

	
 
    LocalSolution(), // sending a local solution to the leader
 
    Notification, // just a notification (so message is about sending the SyncHeader)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm.
 
#[derive(Debug)]
 
pub(crate) struct SyncMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub target_component_id: ConnectorId,
 
    pub content: SyncContent,
 
}
 

	
 
/// A control message is a message intended for the scheduler that is executing
 
/// a component.
 
#[derive(Debug)]
 
pub(crate) struct ControlMessageFancy {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sending_component_id: ConnectorId,
 
    pub content: ControlContent,
 
}
 

	
0 comments (0 inline, 0 general)