Files @ ce98be9707a6
Branch filter:

Location: CSY/reowolf/src/runtime2/consensus.rs

ce98be9707a6 8.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
wip on refactoring component

use crate::protocol::eval::ValueGroup;
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
use crate::runtime2::ConnectorId;
use crate::runtime2::inbox2::{DataHeader, SyncHeader};
use crate::runtime2::port::PortIdLocal;
use crate::runtime2::scheduler::ComponentCtxFancy;
use super::inbox2::PortAnnotation;

struct BranchAnnotation {
    port_mapping: Vec<PortAnnotation>,
}

/// The consensus algorithm. Currently only implemented to find the component
/// with the highest ID within the sync region and letting it handle all the
/// local solutions.
///
/// The type itself serves as an experiment to see how code should be organized.
// TODO: Flatten all datastructures
pub(crate) struct Consensus {
    highest_connector_id: ConnectorId,
    branch_annotations: Vec<BranchAnnotation>,
}

#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) enum Consistency {
    Valid,
    Inconsistent,
}

impl Consensus {
    pub fn new() -> Self {
        return Self {
            highest_connector_id: ConnectorId::new_invalid(),
            branch_annotations: Vec::new(),
        }
    }

    // --- Controlling sync round and branches

    /// Sets up the consensus algorithm for a new synchronous round. The
    /// provided ports should be the ports the component owns at the start of
    /// the sync round.
    pub fn start_sync(&mut self, ports: &[PortIdLocal]) {
        debug_assert!(self.branch_annotations.is_empty());
        debug_assert!(!self.highest_connector_id.is_valid());

        // We'll use the first "branch" (the non-sync one) to store our ports,
        // this allows cloning if we created a new branch.
        self.branch_annotations.push(BranchAnnotation{
            port_mapping: ports.iter()
                .map(|v| PortAnnotation{
                    port_id: *v,
                    registered_id: None,
                    expected_firing: None,
                })
                .collect(),
        });
    }

    /// Notifies the consensus algorithm that a new branch has appeared. Must be
    /// called for each forked branch in the execution tree.
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
        // If called correctly. Then each time we are notified the new branch's
        // index is the length in `branch_annotations`.
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
        let new_branch_annotations = BranchAnnotation{
            port_mapping: parent_branch_annotations.port_mapping.clone(),
        };
        self.branch_annotations.push(new_branch_annotations);
    }

    /// Notifies the consensus algorithm that a branch has reached the end of
    /// the sync block. A final check for consistency will be performed that the
    /// caller has to handle
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
        let branch = &self.branch_annotations[branch_id.index as usize];
        for mapping in &branch.port_mapping {
            match mapping.expected_firing {
                Some(expected) => {
                    if expected != mapping.registered_id.is_some() {
                        // Inconsistent speculative state and actual state
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
                        return Consistency::Inconsistent;
                    }
                },
                None => {},
            }
        }

        return Consistency::Valid;
    }

    /// Notifies the consensus algorithm that a particular branch has assumed
    /// a speculative value for its port mapping.
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency {
        let branch = &mut self.branch_annotations[branch_id.index as usize];
        for mapping in &mut branch.port_mapping {
            if mapping.port_id == port_id {
                match mapping.expected_firing {
                    None => {
                        // Not yet mapped, perform speculative mapping
                        mapping.expected_firing = Some(does_fire);
                        return Consistency::Valid;
                    },
                    Some(current) => {
                        // Already mapped
                        if current == does_fire {
                            return Consistency::Valid;
                        } else {
                            return Consistency::Inconsistent;
                        }
                    }
                }
            }
        }

        unreachable!("notify_of_speculative_mapping called with unowned port");
    }

    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
        todo!("write");
    }

    // --- Handling messages

    /// Prepares a message for sending. Caller should have made sure that
    /// sending the message is consistent with the speculative state.
    pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) {
        if cfg!(debug_assertions) {
            let branch = &self.branch_annotations[branch_id.index as usize];
            let port = branch.port_mapping.iter()
                .find(|v| v.port_id == source_port_id)
                .unwrap();
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
        }

        
    }

    pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) {
        todo!("should check IDs and maybe send sync messages");
    }

    /// Checks data header and consults the stored port mapping and the
    /// execution tree to see which branches may receive the data message's
    /// contents.
    ///
    /// This function is generally called for freshly received messages that
    /// should be matched against previously halted branches.
    pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) {
            if branch.awaiting_port == data_header.target_port {
                // Found a branch awaiting the message, but we need to make sure
                // the mapping is correct
                if self.branch_can_receive(branch.id, data_header) {
                    target_ids.push(branch.id);
                }
            }
        }
    }

    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) {
        debug_assert!(self.branch_can_receive(branch_id, data_header));
        let branch = &mut self.branch_annotations[branch_id.index as usize];
        for mapping in &mut branch.port_mapping {
            if mapping.port_id == data_header.target_port {
                mapping.registered_id = Some(data_header.new_mapping);
                return;
            }
        }

        // If here, then the branch didn't actually own the port? Means the
        // caller made a mistake
        unreachable!("incorrect notify_of_received_message");
    }

    /// Matches the mapping between the branch and the data message. If they
    /// match then the branch can receive the message.
    pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
        let annotation = &self.branch_annotations[branch_id.index as usize];
        for expected in &data_header.expected_mapping {
            // If we own the port, then we have an entry in the
            // annotation, check if the current mapping matches
            for current in &annotation.port_mapping {
                if expected.port_id == current.port_id {
                    if expected.registered_id != current.registered_id {
                        // IDs do not match, we cannot receive the
                        // message in this branch
                        return false;
                    }
                }
            }
        }

        return true;
    }
}