Changeset - bc29d573b2db
[Not reviewed]
0 3 0
MH - 4 years ago 2021-11-07 22:13:07
contact@maxhenger.nl
WIP on revised consensus algorithm
3 files changed with 108 insertions and 17 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
@@ -130,6 +130,7 @@ impl BranchQueue {
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
@@ -185,6 +186,7 @@ impl ExecTree {
 

	
 
    /// Pops a branch (ID) from a queue.
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round
 
        let queue = &mut self.queues[kind.as_index()];
 
        if queue.is_empty() {
 
            return None;
 
@@ -219,10 +221,24 @@ impl ExecTree {
 
        return &mut self.branches[0];
 
    }
 

	
 
    /// Returns an iterator over all the elements in the queue of the given kind
 
    pub fn iter_queue(&self, kind: QueueKind) -> BranchQueueIter {
 
    /// Returns an iterator over all the elements in the queue of the given
 
    /// kind. One can start the iteration at the branch *after* the provided
 
    /// branch. Just make sure it actually is in the provided queue.
 
    pub fn iter_queue(&self, kind: QueueKind, start_at: Option<BranchId>) -> BranchQueueIter {
 
        let queue = &self.queues[kind.as_index()];
 
        let index = queue.first as usize;
 

	
 
        let index = match start_at {
 
            Some(branch_id) => {
 
                debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id));
 
                let branch = &self.branches[branch_id.index as usize];
 

	
 
                branch.next_in_queue.index as usize
 
            },
 
            None => {
 
                queue.first as usize
 
            }
 
        };
 

	
 
        return BranchQueueIter {
 
            branches: self.branches.as_slice(),
 
            index,
src/runtime2/connector2.rs
Show inline comments
 
use std::collections::HashMap;
 
/// connector.rs
 
///
 
/// Represents a component. A component (and the scheduler that is running it)
 
@@ -70,28 +71,51 @@ pub(crate) struct ConnectorPDL {
 
    consensus: Consensus,
 
}
 

	
 
struct ConnectorRunContext {}
 
struct ConnectorRunContext<'a> {
 
    branch_id: BranchId,
 
    consensus: &'a Consensus,
 
    received: &'a HashMap<PortIdLocal, ValueGroup>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
}
 

	
 
impl RunContext for ConnectorRunContext{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.registered_id.is_some();
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.received.get(&port_id) {
 
            Some(data) => Some(data.clone()),
 
            None => None,
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
        return annotation.expected_firing.map(|v| Value::Bool(v));
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
        return self.prepared_channel.take();
 
    }
 
}
 

	
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        todo!()
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            self.consensus.handle_new_finished_sync_branches();
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
        }
 
    }
 
}
 

	
 
@@ -143,7 +167,7 @@ impl ConnectorPDL {
 

	
 
    // --- Running code
 

	
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
    pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        // Check if we have any branch that needs running
 
        debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync());
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
@@ -155,7 +179,13 @@ impl ConnectorPDL {
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 

	
 
        let mut run_context = ConnectorRunContext{};
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Handle the returned result. Note that this match statement contains
 
@@ -266,13 +296,19 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
 
        debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync());
 

	
 
        let branch = self.tree.base_branch_mut();
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{};
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
src/runtime2/consensus.rs
Show inline comments
 

	
 
use std::path::Component;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
@@ -20,8 +20,13 @@ struct BranchAnnotation {
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
pub(crate) struct Consensus {
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    // Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
@@ -36,6 +41,8 @@ impl Consensus {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
@@ -46,6 +53,13 @@ impl Consensus {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
@@ -81,8 +95,9 @@ impl Consensus {
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle
 
    /// caller has to handle. Note that
 
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for mapping in &branch.port_mapping {
 
            match mapping.expected_firing {
 
@@ -103,6 +118,7 @@ impl Consensus {
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == port_id {
 
@@ -127,8 +143,31 @@ impl Consensus {
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates sync messages for any branches that are at the end of the
 
    /// sync block. To find these branches, they should've been put in the
 
    /// "finished" queue in the execution tree.
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let mut last_branch_id = self.last_finished_handled;
 
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
 
            // Turn the port mapping into a local solution
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
        todo!("write");
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        final_ports.clear();
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        for port in &branch.port_mapping {
 
            final_ports.push(port.port_id);
 
        }
 
    }
 

	
 
    // --- Handling messages
 
@@ -191,7 +230,7 @@ impl Consensus {
 
    /// This function is generally called for freshly received messages that
 
    /// should be matched against previously halted branches.
 
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
0 comments (0 inline, 0 general)