diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 872f1834a59e38ea0060e22458248874ec3858cb..dedba6bf5a17af413d57f54ac8858eebf0b38fbe 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -8,5 +8,5 @@ mod raw_vec; pub(crate) use string_pool::{StringPool, StringRef}; pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection}; -pub(crate) use sets::DequeSet; +pub(crate) use sets::{DequeSet, VecSet}; pub(crate) use raw_vec::RawVec; \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 69ca8e689d202ab1aae53a6991cc15503c604bc7..d9ae521f4c43e4a5c3d068a6dc8d0779c628dc1b 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,4 +1,5 @@ use std::path::Component; +use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; @@ -11,6 +12,16 @@ struct BranchAnnotation { port_mapping: Vec, } +pub(crate) struct LocalSolution { + component: ConnectorId, + final_branch_id: BranchId, + port_mapping: Vec<(PortIdLocal, BranchId)>, +} + +pub(crate) struct GlobalSolution { + +} + /// The consensus algorithm. Currently only implemented to find the component /// with the highest ID within the sync region and letting it handle all the /// local solutions. @@ -26,6 +37,7 @@ pub(crate) struct Consensus { last_finished_handled: Option, // Gathered state (in case we are currently the leader of the distributed // consensus protocol) + encountered_peers: VecSet, // Workspaces workspace_ports: Vec, } @@ -42,6 +54,7 @@ impl Consensus { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), last_finished_handled: None, + encountered_peers: VecSet::new(), workspace_ports: Vec::new(), } } @@ -220,7 +233,10 @@ impl Consensus { } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { - todo!("should check IDs and maybe send sync messages"); + debug_assert!(sync_header.sending_component_id != ctx.id) + if sync_header.highest_component_id > self.highest_connector_id { + // Sender has higher component ID + } } /// Checks data header and consults the stored port mapping and the @@ -268,7 +284,7 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &data_header.expected_mapping { // If we own the port, then we have an entry in the @@ -286,6 +302,12 @@ impl Consensus { return true; } + + // --- Internal helpers + + fn forward_solutions_to(&mut self, target: ConnectorId) { + todo!("write") + } } /// Recursively goes through the value group, attempting to find ports. diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index ff2d65a34b542ca74445a72bbbb3499091ce8011..fdffe317a062dc1a0b20e61b0d22d2ae197fea79 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -29,7 +29,7 @@ pub(crate) struct DataHeader { } /// A data message is a message that is intended for the receiver's PDL code, -/// but will also be handled by the consensus algrorithm +/// but will also be handled by the consensus algorithm #[derive(Debug, Clone)] pub(crate) struct DataMessageFancy { pub sync_header: SyncHeader, @@ -39,7 +39,8 @@ pub(crate) struct DataMessageFancy { #[derive(Debug)] pub(crate) enum SyncContent { - + LocalSolution(), // sending a local solution to the leader + Notification, // just a notification (so message is about sending the SyncHeader) } /// A sync message is a message that is intended only for the consensus @@ -47,6 +48,7 @@ pub(crate) enum SyncContent { #[derive(Debug)] pub(crate) struct SyncMessageFancy { pub sync_header: SyncHeader, + pub target_component_id: ConnectorId, pub content: SyncContent, }