diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 69ca8e689d202ab1aae53a6991cc15503c604bc7..d9ae521f4c43e4a5c3d068a6dc8d0779c628dc1b 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,4 +1,5 @@ use std::path::Component; +use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; @@ -11,6 +12,16 @@ struct BranchAnnotation { port_mapping: Vec, } +pub(crate) struct LocalSolution { + component: ConnectorId, + final_branch_id: BranchId, + port_mapping: Vec<(PortIdLocal, BranchId)>, +} + +pub(crate) struct GlobalSolution { + +} + /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. @@ -26,6 +37,7 @@ pub(crate) struct Consensus { last_finished_handled: Option, // Gathered state (in case we are currently the leader of the distributed // consensus protocol) + encountered_peers: VecSet, // Workspaces workspace_ports: Vec, } @@ -42,6 +54,7 @@ impl Consensus { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), last_finished_handled: None, + encountered_peers: VecSet::new(), workspace_ports: Vec::new(), } } @@ -220,7 +233,10 @@ impl Consensus { } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { - todo!("should check IDs and maybe send sync messages"); + debug_assert!(sync_header.sending_component_id != ctx.id) + if sync_header.highest_component_id > self.highest_connector_id { + // Sender has higher component ID + } } /// Checks data header and consults the stored port mapping and the @@ -268,7 +284,7 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &data_header.expected_mapping { // If we own the port, then we have an entry in the @@ -286,6 +302,12 @@ impl Consensus { return true; } + + // --- Internal helpers + + fn forward_solutions_to(&mut self, target: ConnectorId) { + todo!("write") + } } /// Recursively goes through the value group, attempting to find ports.