diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..4e2be12a01bfddf3d22459770d76da2f91bfe2a1 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -0,0 +1,199 @@ + +use crate::protocol::eval::ValueGroup; +use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; +use crate::runtime2::ConnectorId; +use crate::runtime2::inbox2::{DataHeader, SyncHeader}; +use crate::runtime2::port::PortIdLocal; +use crate::runtime2::scheduler::ComponentCtxFancy; +use super::inbox2::PortAnnotation; + +struct BranchAnnotation { + port_mapping: Vec, +} + +/// The consensus algorithm. Currently only implemented to find the component +/// with the highest ID within the sync region and letting it handle all the +/// local solutions. +/// +/// The type itself serves as an experiment to see how code should be organized. +// TODO: Flatten all datastructures +pub(crate) struct Consensus { + highest_connector_id: ConnectorId, + branch_annotations: Vec, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) enum Consistency { + Valid, + Inconsistent, +} + +impl Consensus { + pub fn new() -> Self { + return Self { + highest_connector_id: ConnectorId::new_invalid(), + branch_annotations: Vec::new(), + } + } + + // --- Controlling sync round and branches + + /// Sets up the consensus algorithm for a new synchronous round. The + /// provided ports should be the ports the component owns at the start of + /// the sync round. + pub fn start_sync(&mut self, ports: &[PortIdLocal]) { + debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(!self.highest_connector_id.is_valid()); + + // We'll use the first "branch" (the non-sync one) to store our ports, + // this allows cloning if we created a new branch. + self.branch_annotations.push(BranchAnnotation{ + port_mapping: ports.iter() + .map(|v| PortAnnotation{ + port_id: *v, + registered_id: None, + expected_firing: None, + }) + .collect(), + }); + } + + /// Notifies the consensus algorithm that a new branch has appeared. Must be + /// called for each forked branch in the execution tree. + pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { + // If called correctly. Then each time we are notified the new branch's + // index is the length in `branch_annotations`. + debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); + let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_branch_annotations = BranchAnnotation{ + port_mapping: parent_branch_annotations.port_mapping.clone(), + }; + self.branch_annotations.push(new_branch_annotations); + } + + /// Notifies the consensus algorithm that a branch has reached the end of + /// the sync block. A final check for consistency will be performed that the + /// caller has to handle + pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { + let branch = &self.branch_annotations[branch_id.index as usize]; + for mapping in &branch.port_mapping { + match mapping.expected_firing { + Some(expected) => { + if expected != mapping.registered_id.is_some() { + // Inconsistent speculative state and actual state + debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier + return Consistency::Inconsistent; + } + }, + None => {}, + } + } + + return Consistency::Valid; + } + + /// Notifies the consensus algorithm that a particular branch has assumed + /// a speculative value for its port mapping. + pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.port_mapping { + if mapping.port_id == port_id { + match mapping.expected_firing { + None => { + // Not yet mapped, perform speculative mapping + mapping.expected_firing = Some(does_fire); + return Consistency::Valid; + }, + Some(current) => { + // Already mapped + if current == does_fire { + return Consistency::Valid; + } else { + return Consistency::Inconsistent; + } + } + } + } + } + + unreachable!("notify_of_speculative_mapping called with unowned port"); + } + + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { + todo!("write"); + } + + // --- Handling messages + + /// Prepares a message for sending. Caller should have made sure that + /// sending the message is consistent with the speculative state. + pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) { + if cfg!(debug_assertions) { + let branch = &self.branch_annotations[branch_id.index as usize]; + let port = branch.port_mapping.iter() + .find(|v| v.port_id == source_port_id) + .unwrap(); + debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); + } + + + } + + pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { + todo!("should check IDs and maybe send sync messages"); + } + + /// Checks data header and consults the stored port mapping and the + /// execution tree to see which branches may receive the data message's + /// contents. + /// + /// This function is generally called for freshly received messages that + /// should be matched against previously halted branches. + pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { + for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) { + if branch.awaiting_port == data_header.target_port { + // Found a branch awaiting the message, but we need to make sure + // the mapping is correct + if self.branch_can_receive(branch.id, data_header) { + target_ids.push(branch.id); + } + } + } + } + + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) { + debug_assert!(self.branch_can_receive(branch_id, data_header)); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.port_mapping { + if mapping.port_id == data_header.target_port { + mapping.registered_id = Some(data_header.new_mapping); + return; + } + } + + // If here, then the branch didn't actually own the port? Means the + // caller made a mistake + unreachable!("incorrect notify_of_received_message"); + } + + /// Matches the mapping between the branch and the data message. If they + /// match then the branch can receive the message. + pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + let annotation = &self.branch_annotations[branch_id.index as usize]; + for expected in &data_header.expected_mapping { + // If we own the port, then we have an entry in the + // annotation, check if the current mapping matches + for current in &annotation.port_mapping { + if expected.port_id == current.port_id { + if expected.registered_id != current.registered_id { + // IDs do not match, we cannot receive the + // message in this branch + return false; + } + } + } + } + + return true; + } +} \ No newline at end of file