From 649f3bb14317d6b8a66e5e0ee564360ca5f93eb3 2021-11-08 00:00:18 From: MH Date: 2021-11-08 00:00:18 Subject: [PATCH] storing and forwarding local consensus solutions --- diff --git a/src/collections/sets.rs b/src/collections/sets.rs index 634862349b71ee77c202954f62f467b900184ab3..bdcf09861f8de3d733e72e0c6022236111c5916e 100644 --- a/src/collections/sets.rs +++ b/src/collections/sets.rs @@ -88,6 +88,11 @@ impl VecSet { self.inner.clear(); } + #[inline] + pub fn iter(&self) -> impl Iterator { + return self.inner.iter(); + } + #[inline] pub fn is_empty(&self) -> bool { self.inner.is_empty() diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 4b3e5a79740ff5ffd7da36ce8f791911057d3739..cb605fd86e7df443c8f41bf660a1d411bebcd52c 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -15,7 +15,7 @@ pub struct BranchId { impl BranchId { #[inline] - fn new_invalid() -> Self { + pub(crate) fn new_invalid() -> Self { return Self{ index: 0 }; } diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index d9ae521f4c43e4a5c3d068a6dc8d0779c628dc1b..49cdab9fdcf2c70ea447a59b5699f7b1829d5b99 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -3,7 +3,8 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; -use crate::runtime2::inbox2::{DataHeader, SyncHeader}; +use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; +use crate::runtime2::inbox::SyncMessage; use crate::runtime2::port::{Port, PortIdLocal}; use crate::runtime2::scheduler::ComponentCtxFancy; use super::inbox2::PortAnnotation; @@ -30,6 +31,8 @@ pub(crate) struct GlobalSolution { // TODO: Flatten all datastructures // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups +// TODO: A lot of stuff should be batched. Like checking all the sync headers +// and sending "I have a higher ID" messages. pub(crate) struct Consensus { // Local component's state highest_connector_id: ConnectorId, @@ -38,6 +41,7 @@ pub(crate) struct Consensus { // Gathered state (in case we are currently the leader of the distributed // consensus protocol) encountered_peers: VecSet, + local_solutions: Vec, // Workspaces workspace_ports: Vec, } @@ -55,6 +59,7 @@ impl Consensus { branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), + local_solutions: Vec::new(), workspace_ports: Vec::new(), } } @@ -77,8 +82,9 @@ impl Consensus { /// provided ports should be the ports the component owns at the start of /// the sync round. pub fn start_sync(&mut self, ports: &[Port]) { - debug_assert!(self.branch_annotations.is_empty()); debug_assert!(!self.highest_connector_id.is_valid()); + debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(self.encountered_peers.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. @@ -165,6 +171,22 @@ impl Consensus { let mut last_branch_id = self.last_finished_handled; for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + target_mapping.push(( + port.port_id, + port.registered_id.unwrap_or(BranchId::new_invalid()) + )); + } + + let local_solution = LocalSolution{ + component: ctx.id, + final_branch_id: branch.id, + port_mapping: target_mapping, + }; + last_branch_id = Some(branch.id); } @@ -206,14 +228,10 @@ impl Consensus { self.workspace_ports.clear(); } - let sync_header = SyncHeader{ - sending_component_id: ctx.id, - highest_component_id: self.highest_connector_id, - }; - // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); + let sync_header = self.create_sync_header(ctx); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), @@ -233,10 +251,40 @@ impl Consensus { } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { - debug_assert!(sync_header.sending_component_id != ctx.id) + debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves + + self.encountered_peers.push(sync_header.sending_component_id); + if sync_header.highest_component_id > self.highest_connector_id { - // Sender has higher component ID - } + // Sender has higher component ID. So should be the target of our + // messages. We should also let all of our peers know + self.highest_connector_id = sync_header.highest_component_id; + for encountered_id in self.encountered_peers.iter() { + if encountered_id == sync_header.sending_component_id { + // Don't need to send it to this one + continue + } + + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: encountered_id, + content: SyncContent::Notification, + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + + // But also send our locally combined solution + self.forward_local_solutions(); + } else if sync_header.highest_component_id < self.highest_connector_id { + // Sender has lower leader ID, so it should know about our higher + // one. + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: sync_header.sending_component_id, + content: SyncContent::Notification + }; + ctx.submit_message(MessageFancy::Sync(message)); + } // else: exactly equal, so do nothing } /// Checks data header and consults the stored port mapping and the @@ -245,6 +293,7 @@ impl Consensus { /// /// This function is generally called for freshly received messages that /// should be matched against previously halted branches. + /// TODO: Rename, name confused me after a day pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { @@ -304,9 +353,50 @@ impl Consensus { } // --- Internal helpers + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { + if self.highest_connector_id == ctx.id { + // We are the leader + self.store_local_solution(solution, ctx); + } else { + // Someone else is the leader + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncContent::LocalSolution(solution), + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + } + + /// Stores the local solution internally. This assumes that we are the + /// leader. + fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) { + debug_assert_eq!(self.highest_connector_id, _ctx.id); + + self.local_solutions.push(solution); + } - fn forward_solutions_to(&mut self, target: ConnectorId) { - todo!("write") + #[inline] + fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader { + return SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + } + } + + fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) { + debug_assert_ne!(self.highest_connector_id, ctx.id); + + if !self.local_solutions.is_empty() { + for local_solution in self.local_solutions.drain() { + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncContent::LocalSolution(local_solution), + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + } } } diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index fdffe317a062dc1a0b20e61b0d22d2ae197fea79..9c4ad0a7b3d9ab9ca18e0e6c086615366e592264 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -1,6 +1,7 @@ use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::BranchId; use crate::runtime2::ConnectorId; +use crate::runtime2::consensus::LocalSolution; use crate::runtime2::port::PortIdLocal; // TODO: Remove Debug derive from all types @@ -39,7 +40,7 @@ pub(crate) struct DataMessageFancy { #[derive(Debug)] pub(crate) enum SyncContent { - LocalSolution(), // sending a local solution to the leader + LocalSolution(LocalSolution), // sending a local solution to the leader Notification, // just a notification (so message is about sending the SyncHeader) }