diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index d9ae521f4c43e4a5c3d068a6dc8d0779c628dc1b..49cdab9fdcf2c70ea447a59b5699f7b1829d5b99 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -3,7 +3,8 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; -use crate::runtime2::inbox2::{DataHeader, SyncHeader}; +use crate::runtime2::inbox2::{DataHeader, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; +use crate::runtime2::inbox::SyncMessage; use crate::runtime2::port::{Port, PortIdLocal}; use crate::runtime2::scheduler::ComponentCtxFancy; use super::inbox2::PortAnnotation; @@ -30,6 +31,8 @@ pub(crate) struct GlobalSolution { // TODO: Flatten all datastructures // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups +// TODO: A lot of stuff should be batched. Like checking all the sync headers +// and sending "I have a higher ID" messages. pub(crate) struct Consensus { // Local component's state highest_connector_id: ConnectorId, @@ -38,6 +41,7 @@ pub(crate) struct Consensus { // Gathered state (in case we are currently the leader of the distributed // consensus protocol) encountered_peers: VecSet, + local_solutions: Vec, // Workspaces workspace_ports: Vec, } @@ -55,6 +59,7 @@ impl Consensus { branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), + local_solutions: Vec::new(), workspace_ports: Vec::new(), } } @@ -77,8 +82,9 @@ impl Consensus { /// provided ports should be the ports the component owns at the start of /// the sync round. pub fn start_sync(&mut self, ports: &[Port]) { - debug_assert!(self.branch_annotations.is_empty()); debug_assert!(!self.highest_connector_id.is_valid()); + debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(self.encountered_peers.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. @@ -165,6 +171,22 @@ impl Consensus { let mut last_branch_id = self.last_finished_handled; for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + target_mapping.push(( + port.port_id, + port.registered_id.unwrap_or(BranchId::new_invalid()) + )); + } + + let local_solution = LocalSolution{ + component: ctx.id, + final_branch_id: branch.id, + port_mapping: target_mapping, + }; + last_branch_id = Some(branch.id); } @@ -206,14 +228,10 @@ impl Consensus { self.workspace_ports.clear(); } - let sync_header = SyncHeader{ - sending_component_id: ctx.id, - highest_component_id: self.highest_connector_id, - }; - // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); + let sync_header = self.create_sync_header(ctx); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), @@ -233,10 +251,40 @@ impl Consensus { } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { - debug_assert!(sync_header.sending_component_id != ctx.id) + debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves + + self.encountered_peers.push(sync_header.sending_component_id); + if sync_header.highest_component_id > self.highest_connector_id { - // Sender has higher component ID - } + // Sender has higher component ID. So should be the target of our + // messages. We should also let all of our peers know + self.highest_connector_id = sync_header.highest_component_id; + for encountered_id in self.encountered_peers.iter() { + if encountered_id == sync_header.sending_component_id { + // Don't need to send it to this one + continue + } + + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: encountered_id, + content: SyncContent::Notification, + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + + // But also send our locally combined solution + self.forward_local_solutions(); + } else if sync_header.highest_component_id < self.highest_connector_id { + // Sender has lower leader ID, so it should know about our higher + // one. + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: sync_header.sending_component_id, + content: SyncContent::Notification + }; + ctx.submit_message(MessageFancy::Sync(message)); + } // else: exactly equal, so do nothing } /// Checks data header and consults the stored port mapping and the @@ -245,6 +293,7 @@ impl Consensus { /// /// This function is generally called for freshly received messages that /// should be matched against previously halted branches. + /// TODO: Rename, name confused me after a day pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { @@ -304,9 +353,50 @@ impl Consensus { } // --- Internal helpers + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { + if self.highest_connector_id == ctx.id { + // We are the leader + self.store_local_solution(solution, ctx); + } else { + // Someone else is the leader + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncContent::LocalSolution(solution), + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + } + + /// Stores the local solution internally. This assumes that we are the + /// leader. + fn store_local_solution(&mut self, solution: LocalSolution, _ctx: &ComponentCtxFancy) { + debug_assert_eq!(self.highest_connector_id, _ctx.id); + + self.local_solutions.push(solution); + } - fn forward_solutions_to(&mut self, target: ConnectorId) { - todo!("write") + #[inline] + fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader { + return SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + } + } + + fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) { + debug_assert_ne!(self.highest_connector_id, ctx.id); + + if !self.local_solutions.is_empty() { + for local_solution in self.local_solutions.drain() { + let message = SyncMessageFancy{ + sync_header: self.create_sync_header(ctx), + target_component_id: self.highest_connector_id, + content: SyncContent::LocalSolution(local_solution), + }; + ctx.submit_message(MessageFancy::Sync(message)); + } + } } }