Changeset - d98661a2215e
[Not reviewed]
0 3 0
mh - 4 years ago 2021-11-07 22:45:20
contact@maxhenger.nl
negligable update to consensus algorithm
3 files changed with 29 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
@@ -8,5 +8,5 @@ mod raw_vec;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::DequeSet;
 
pub(crate) use sets::{DequeSet, VecSet};
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
@@ -11,6 +12,16 @@ struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(PortIdLocal, BranchId)>,
 
}
 

	
 
pub(crate) struct GlobalSolution {
 

	
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
@@ -26,6 +37,7 @@ pub(crate) struct Consensus {
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 
@@ -42,6 +54,7 @@ impl Consensus {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 
@@ -220,7 +233,10 @@ impl Consensus {
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        todo!("should check IDs and maybe send sync messages");
 
        debug_assert!(sync_header.sending_component_id != ctx.id)
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID
 
        }
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
@@ -268,7 +284,7 @@ impl Consensus {
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
@@ -286,6 +302,12 @@ impl Consensus {
 

	
 
        return true;
 
    }
 

	
 
    // --- Internal helpers
 

	
 
    fn forward_solutions_to(&mut self, target: ConnectorId) {
 
        todo!("write")
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
src/runtime2/inbox2.rs
Show inline comments
 
@@ -29,7 +29,7 @@ pub(crate) struct DataHeader {
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algrorithm
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessageFancy {
 
    pub sync_header: SyncHeader,
 
@@ -39,7 +39,8 @@ pub(crate) struct DataMessageFancy {
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 

	
 
    LocalSolution(), // sending a local solution to the leader
 
    Notification, // just a notification (so message is about sending the SyncHeader)
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
@@ -47,6 +48,7 @@ pub(crate) enum SyncContent {
 
#[derive(Debug)]
 
pub(crate) struct SyncMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub target_component_id: ConnectorId,
 
    pub content: SyncContent,
 
}
 

	
0 comments (0 inline, 0 general)