From bc29d573b2db92f78cb33483b37c1010601d0fc4 2021-11-07 22:13:07 From: MH Date: 2021-11-07 22:13:07 Subject: [PATCH] WIP on revised consensus algorithm --- diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 2bc4b40970ebb3e4e47bb2cc73d876d2b9a0ef96..4b3e5a79740ff5ffd7da36ce8f791911057d3739 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -130,6 +130,7 @@ impl BranchQueue { const NUM_QUEUES: usize = 3; +#[derive(PartialEq, Eq)] pub(crate) enum QueueKind { Runnable, AwaitingMessage, @@ -185,6 +186,7 @@ impl ExecTree { /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round let queue = &mut self.queues[kind.as_index()]; if queue.is_empty() { return None; @@ -219,10 +221,24 @@ impl ExecTree { return &mut self.branches[0]; } - /// Returns an iterator over all the elements in the queue of the given kind - pub fn iter_queue(&self, kind: QueueKind) -> BranchQueueIter { + /// Returns an iterator over all the elements in the queue of the given + /// kind. One can start the iteration at the branch *after* the provided + /// branch. Just make sure it actually is in the provided queue. + pub fn iter_queue(&self, kind: QueueKind, start_at: Option) -> BranchQueueIter { let queue = &self.queues[kind.as_index()]; - let index = queue.first as usize; + + let index = match start_at { + Some(branch_id) => { + debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id)); + let branch = &self.branches[branch_id.index as usize]; + + branch.next_in_queue.index as usize + }, + None => { + queue.first as usize + } + }; + return BranchQueueIter { branches: self.branches.as_slice(), index, diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 37ecca56cc66c87b0c6ec74d7295cc0d4bb1a350..14858c64491958cfd90137cd6b01b7c8e12740bd 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; /// connector.rs /// /// Represents a component. A component (and the scheduler that is running it) @@ -62,7 +63,7 @@ pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running - Exit, // Connector has exited + Exit, // Connector has exited } pub(crate) struct ConnectorPDL { @@ -70,28 +71,51 @@ pub(crate) struct ConnectorPDL { consensus: Consensus, } -struct ConnectorRunContext {} +struct ConnectorRunContext<'a> { + branch_id: BranchId, + consensus: &'a Consensus, + received: &'a HashMap, + scheduler: SchedulerCtx<'a>, + prepared_channel: Option<(Value, Value)>, +} + impl RunContext for ConnectorRunContext{ fn did_put(&mut self, port: PortId) -> bool { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.registered_id.is_some(); } fn get(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + match self.received.get(&port_id) { + Some(data) => Some(data.clone()), + None => None, + } } fn fires(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.expected_firing.map(|v| Value::Bool(v)); } fn get_channel(&mut self) -> Option<(Value, Value)> { - todo!() + return self.prepared_channel.take(); } } impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - todo!() + self.handle_new_messages(comp_ctx); + if self.tree.is_in_sync() { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + self.consensus.handle_new_finished_sync_branches(); + return scheduling; + } else { + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); + return scheduling; + } } } @@ -143,7 +167,7 @@ impl ConnectorPDL { // --- Running code - pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { // Check if we have any branch that needs running debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); @@ -155,7 +179,13 @@ impl ConnectorPDL { let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; - let mut run_context = ConnectorRunContext{}; + let mut run_context = ConnectorRunContext{ + branch_id, + consensus: &self.consensus, + received: &branch.inbox, + scheduler: *sched_ctx, + prepared_channel: branch.prepared_channel.take(), + }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Handle the returned result. Note that this match statement contains @@ -266,13 +296,19 @@ impl ConnectorPDL { } } - pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); - let mut run_context = ConnectorRunContext{}; + let mut run_context = ConnectorRunContext{ + branch_id, + consensus: &self.consensus, + received: &branch.inbox, + scheduler: *sched_ctx, + prepared_channel: branch.prepared_channel.take(), + }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 110d934e910921dd947b8b2fd8a6e042dc285ace..69ca8e689d202ab1aae53a6991cc15503c604bc7 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,4 +1,4 @@ - +use std::path::Component; use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; @@ -20,8 +20,13 @@ struct BranchAnnotation { // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups pub(crate) struct Consensus { + // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, + last_finished_handled: Option, + // Gathered state (in case we are currently the leader of the distributed + // consensus protocol) + // Workspaces workspace_ports: Vec, } @@ -36,6 +41,8 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), + last_finished_handled: None, + workspace_ports: Vec::new(), } } @@ -46,6 +53,13 @@ impl Consensus { return !self.branch_annotations.is_empty(); } + /// TODO: Remove this once multi-fire is in place + pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation { + let branch = &self.branch_annotations[branch_id.index as usize]; + let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap(); + return port; + } + /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. @@ -81,8 +95,9 @@ impl Consensus { /// Notifies the consensus algorithm that a branch has reached the end of /// the sync block. A final check for consistency will be performed that the - /// caller has to handle + /// caller has to handle. Note that pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { + debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; for mapping in &branch.port_mapping { match mapping.expected_firing { @@ -103,6 +118,7 @@ impl Consensus { /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == port_id { @@ -127,8 +143,31 @@ impl Consensus { unreachable!("notify_of_speculative_mapping called with unowned port"); } + /// Generates sync messages for any branches that are at the end of the + /// sync block. To find these branches, they should've been put in the + /// "finished" queue in the execution tree. + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) { + debug_assert!(self.is_in_sync()); + + let mut last_branch_id = self.last_finished_handled; + for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { + // Turn the port mapping into a local solution + + last_branch_id = Some(branch.id); + } + + self.last_finished_handled = last_branch_id; + } + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { - todo!("write"); + debug_assert!(self.is_in_sync()); + + // TODO: Handle sending and receiving ports + final_ports.clear(); + let branch = &self.branch_annotations[branch_id.index as usize]; + for port in &branch.port_mapping { + final_ports.push(port.port_id); + } } // --- Handling messages @@ -191,7 +230,7 @@ impl Consensus { /// This function is generally called for freshly received messages that /// should be matched against previously halted branches. pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) { + for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct