use crate::protocol::eval::ValueGroup; use crate::runtime2::communication::*; use super::component_pdl::*; pub struct PortAnnotation { id: PortId, mapping: Option, } impl PortAnnotation { fn new(id: PortId) -> Self { return Self{ id, mapping: None } } } /// Tracking consensus state pub struct Consensus { round: u32, mapping_counter: u32, ports: Vec, } impl Consensus { pub(crate) fn new() -> Self { return Self{ round: 0, mapping_counter: 0, ports: Vec::new(), } } pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { // Make sure we locally still have all of the same ports self.transfer_ports(comp_ctx); self.mapping_counter = 0; } pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); let data_header = self.create_data_header(port_info); let sync_header = self.create_sync_header(); return DataMessage{ data_header, sync_header, content }; } pub(crate) fn notify_sync_end(&mut self) { self.round = self.round.wrapping_add(1); todo!("implement sync end") } pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) { let mut needs_setting_ports = false; if comp_ctx.ports.len() != self.ports.len() { needs_setting_ports = true; } else { for idx in 0..comp_ctx.ports.len() { let comp_port_id = comp_ctx.ports[idx].self_id; let cons_port_id = self.ports[idx].id; if comp_port_id != cons_port_id { needs_setting_ports = true; break; } } } if needs_setting_ports { self.ports.clear(); self.ports.reserve(comp_ctx.ports.len()); for port in &comp_ctx.ports { self.ports.push(PortAnnotation::new(port.self_id)) } } } fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); for port in &self.ports { if let Some(mapping) = port.mapping { expected_mapping.push((port.id, mapping)); } } debug_assert_eq!(port_info.kind, PortKind::Putter); return MessageDataHeader{ expected_mapping, new_mapping: self.take_mapping(), source_port: port_info.self_id, target_port: port_info.peer_id, }; } fn create_sync_header(&self) -> MessageSyncHeader { return MessageSyncHeader{ sync_round: self.round, }; } fn take_mapping(&mut self) -> u32 { let mapping = self.mapping_counter; self.mapping_counter += 1; return mapping; } }