Files
@ 0781cf1b7abf
Branch filter:
Location: CSY/reowolf/src/runtime2/component/consensus.rs - annotation
0781cf1b7abf
3.0 KiB
application/rls-services+xml
WIP: Adding debug logs, add sync test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | c04f7fea1a62 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 c04f7fea1a62 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 968e958c3286 | use crate::protocol::eval::ValueGroup;
use crate::runtime2::communication::*;
use super::component_pdl::*;
pub struct PortAnnotation {
id: PortId,
mapping: Option<u32>,
}
impl PortAnnotation {
fn new(id: PortId) -> Self {
return Self{ id, mapping: None }
}
}
/// Tracking consensus state
pub struct Consensus {
round: u32,
mapping_counter: u32,
ports: Vec<PortAnnotation>,
}
impl Consensus {
pub(crate) fn new() -> Self {
return Self{
round: 0,
mapping_counter: 0,
ports: Vec::new(),
}
}
pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
// Make sure we locally still have all of the same ports
self.transfer_ports(comp_ctx);
self.mapping_counter = 0;
}
pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage {
debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
let data_header = self.create_data_header(port_info);
let sync_header = self.create_sync_header();
return DataMessage{ data_header, sync_header, content };
}
pub(crate) fn notify_sync_end(&mut self) {
self.round = self.round.wrapping_add(1);
todo!("implement sync end")
}
pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
let mut needs_setting_ports = false;
if comp_ctx.ports.len() != self.ports.len() {
needs_setting_ports = true;
} else {
for idx in 0..comp_ctx.ports.len() {
let comp_port_id = comp_ctx.ports[idx].self_id;
let cons_port_id = self.ports[idx].id;
if comp_port_id != cons_port_id {
needs_setting_ports = true;
break;
}
}
}
if needs_setting_ports {
self.ports.clear();
self.ports.reserve(comp_ctx.ports.len());
for port in &comp_ctx.ports {
self.ports.push(PortAnnotation::new(port.self_id))
}
}
}
fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader {
let mut expected_mapping = Vec::with_capacity(self.ports.len());
for port in &self.ports {
if let Some(mapping) = port.mapping {
expected_mapping.push((port.id, mapping));
}
}
debug_assert_eq!(port_info.kind, PortKind::Putter);
return MessageDataHeader{
expected_mapping,
new_mapping: self.take_mapping(),
source_port: port_info.self_id,
target_port: port_info.peer_id,
};
}
fn create_sync_header(&self) -> MessageSyncHeader {
return MessageSyncHeader{
sync_round: self.round,
};
}
fn take_mapping(&mut self) -> u32 {
let mapping = self.mapping_counter;
self.mapping_counter += 1;
return mapping;
}
}
|