diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs new file mode 100644 index 0000000000000000000000000000000000000000..68ea40f957186ec832cff45e89cb9bbd8a25be29 --- /dev/null +++ b/src/runtime2/component/consensus.rs @@ -0,0 +1,104 @@ +use crate::protocol::eval::*; +use crate::runtime2::communication::*; + +use super::component_pdl::*; + +pub struct PortAnnotation { + id: PortId, + mapping: Option, +} + +impl PortAnnotation { + fn new(id: PortId) -> Self { + return Self{ id, mapping: None } + } +} + +/// Tracking consensus state +pub struct Consensus { + round: u32, + mapping_counter: u32, + ports: Vec, +} + +impl Consensus { + pub(crate) fn new() -> Self { + return Self{ + round: 0, + mapping_counter: 0, + ports: Vec::new(), + } + } + + pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { + // Make sure we locally still have all of the same ports + self.transfer_ports(comp_ctx); + self.mapping_counter = 0; + } + + pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage { + debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); + let data_header = self.create_data_header(port_info); + let sync_header = self.create_sync_header(); + + return DataMessage{ data_header, sync_header, content }; + } + + pub(crate) fn notify_sync_end(&mut self) { + self.round = self.round.wrapping_add(1); + todo!("implement sync end") + } + + pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) { + let mut needs_setting_ports = false; + if comp_ctx.ports.len() != self.ports.len() { + ports_same = true; + } else { + for idx in 0..comp_ctx.ports.len() { + let comp_port_id = comp_ctx.ports[idx].self_id; + let cons_port_id = self.ports[idx].id; + if comp_port_id != cons_port_id { + needs_setting_ports = true; + break; + } + } + } + + if needs_setting_ports { + self.ports.clear(); + self.ports.reserve(comp_ctx.ports.len()); + for port in &comp_ctx.ports { + self.ports.push(PortAnnotation::new(port.self_id)) + } + } + } + + fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader { + let mut expected_mapping = Vec::with_capacity(self.ports.len()); + for port in &self.ports { + if let Some(mapping) = port.mapping { + expected_mapping.push((port.id, mapping)); + } + } + + debug_assert_eq!(port_info.kind, PortKind::Putter); + return MessageDataHeader{ + expected_mapping, + new_mapping: self.take_mapping(), + source_port: port_info.self_id, + target_port: port_info.peer_id, + }; + } + + fn create_sync_header(&self) -> MessageSyncHeader { + return MessageSyncHeader{ + sync_round: self.round, + }; + } + + fn take_mapping(&mut self) -> u32 { + let mapping = self.mapping_counter; + self.mapping_counter += 1; + return mapping; + } +} \ No newline at end of file