Changeset - 649f3bb14317
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-08 00:00:18
contact@maxhenger.nl
storing and forwarding local consensus solutions
4 files changed with 110 insertions and 14 deletions:
0 comments (0 inline, 0 general)
src/collections/sets.rs
Show inline comments
 
@@ -88,6 +88,11 @@ impl<T: Eq> VecSet<T> {
 
        self.inner.clear();
 
    }
 

	
 
    #[inline]
 
    pub fn iter(&self) -> impl Iterator<Item=T> {
 
        return self.inner.iter();
 
    }
 

	
 
    #[inline]
 
    pub fn is_empty(&self) -> bool {
 
        self.inner.is_empty()
src/runtime2/branch.rs
Show inline comments
 
@@ -15,7 +15,7 @@ pub struct BranchId {
 

	
 
impl BranchId {
 
    #[inline]
 
    fn new_invalid() -> Self {
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{ index: 0 };
 
    }
 

	
src/runtime2/consensus.rs
Show inline comments
 
@@ -3,7 +3,8 @@ use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, SyncHeader};
 
use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 
@@ -30,6 +31,8 @@ pub(crate) struct GlobalSolution {
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
pub(crate) struct Consensus {
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
@@ -38,6 +41,7 @@ pub(crate) struct Consensus {
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    local_solutions: Vec<LocalSolution>,
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 
@@ -55,6 +59,7 @@ impl Consensus {
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            local_solutions: Vec::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 
@@ -77,8 +82,9 @@ impl Consensus {
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ports: &[Port]) {
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.encountered_peers.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
@@ -165,6 +171,22 @@ impl Consensus {
 
        let mut last_branch_id = self.last_finished_handled;
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                target_mapping.push((
 
                    port.port_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 

	
 
            let local_solution = LocalSolution{
 
                component: ctx.id,
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 

	
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 
@@ -206,14 +228,10 @@ impl Consensus {
 
            self.workspace_ports.clear();
 
        }
 

	
 
        let sync_header = SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        };
 

	
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let sync_header = self.create_sync_header(ctx);
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
@@ -233,10 +251,40 @@ impl Consensus {
 
    }
 

	
 
    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(sync_header.sending_component_id != ctx.id)
 
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves
 

	
 
        self.encountered_peers.push(sync_header.sending_component_id);
 

	
 
        if sync_header.highest_component_id > self.highest_connector_id {
 
            // Sender has higher component ID
 
        }
 
            // Sender has higher component ID. So should be the target of our
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for encountered_id in self.encountered_peers.iter() {
 
                if encountered_id == sync_header.sending_component_id {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: encountered_id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 

	
 
            // But also send our locally combined solution
 
            self.forward_local_solutions();
 
        } else if sync_header.highest_component_id < self.highest_connector_id {
 
            // Sender has lower leader ID, so it should know about our higher
 
            // one.
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: sync_header.sending_component_id,
 
                content: SyncContent::Notification
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    /// Checks data header and consults the stored port mapping and the
 
@@ -245,6 +293,7 @@ impl Consensus {
 
    ///
 
    /// This function is generally called for freshly received messages that
 
    /// should be matched against previously halted branches.
 
    /// TODO: Rename, name confused me after a day
 
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
@@ -304,9 +353,50 @@ impl Consensus {
 
    }
 

	
 
    // --- Internal helpers
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            self.store_local_solution(solution, ctx);
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncMessageFancy{
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
        }
 
    }
 

	
 
    /// Stores the local solution internally. This assumes that we are the
 
    /// leader.
 
    fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) {
 
        debug_assert_eq!(self.highest_connector_id, _ctx.id);
 

	
 
        self.local_solutions.push(solution);
 
    }
 

	
 
    fn forward_solutions_to(&mut self, target: ConnectorId) {
 
        todo!("write")
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
        }
 
    }
 

	
 
    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if !self.local_solutions.is_empty() {
 
            for local_solution in self.local_solutions.drain() {
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: self.highest_connector_id,
 
                    content: SyncContent::LocalSolution(local_solution),
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
            }
 
        }
 
    }
 
}
 

	
src/runtime2/inbox2.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::BranchId;
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::consensus::LocalSolution;
 
use crate::runtime2::port::PortIdLocal;
 

	
 
// TODO: Remove Debug derive from all types
 
@@ -39,7 +40,7 @@ pub(crate) struct DataMessageFancy {
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 
    LocalSolution(), // sending a local solution to the leader
 
    LocalSolution(LocalSolution), // sending a local solution to the leader
 
    Notification, // just a notification (so message is about sending the SyncHeader)
 
}
 

	
0 comments (0 inline, 0 general)