Files
@ 968e958c3286
Branch filter:
Location: CSY/reowolf/src/runtime2/component/consensus.rs
968e958c3286
2.9 KiB
application/rls-services+xml
WIP: Basic control/data message flow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | use crate::protocol::eval::*;
use crate::runtime2::communication::*;
use super::component_pdl::*;
pub struct PortAnnotation {
id: PortId,
mapping: Option<u32>,
}
impl PortAnnotation {
fn new(id: PortId) -> Self {
return Self{ id, mapping: None }
}
}
/// Tracking consensus state
pub struct Consensus {
round: u32,
mapping_counter: u32,
ports: Vec<PortAnnotation>,
}
impl Consensus {
pub(crate) fn new() -> Self {
return Self{
round: 0,
mapping_counter: 0,
ports: Vec::new(),
}
}
pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
// Make sure we locally still have all of the same ports
self.transfer_ports(comp_ctx);
self.mapping_counter = 0;
}
pub(crate) fn annotate_message_data(&mut self, port_info: &Port, content: ValueGroup) -> DataMessage {
debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
let data_header = self.create_data_header(port_info);
let sync_header = self.create_sync_header();
return DataMessage{ data_header, sync_header, content };
}
pub(crate) fn notify_sync_end(&mut self) {
self.round = self.round.wrapping_add(1);
todo!("implement sync end")
}
pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
let mut needs_setting_ports = false;
if comp_ctx.ports.len() != self.ports.len() {
ports_same = true;
} else {
for idx in 0..comp_ctx.ports.len() {
let comp_port_id = comp_ctx.ports[idx].self_id;
let cons_port_id = self.ports[idx].id;
if comp_port_id != cons_port_id {
needs_setting_ports = true;
break;
}
}
}
if needs_setting_ports {
self.ports.clear();
self.ports.reserve(comp_ctx.ports.len());
for port in &comp_ctx.ports {
self.ports.push(PortAnnotation::new(port.self_id))
}
}
}
fn create_data_header(&mut self, port_info: &Port) -> MessageDataHeader {
let mut expected_mapping = Vec::with_capacity(self.ports.len());
for port in &self.ports {
if let Some(mapping) = port.mapping {
expected_mapping.push((port.id, mapping));
}
}
debug_assert_eq!(port_info.kind, PortKind::Putter);
return MessageDataHeader{
expected_mapping,
new_mapping: self.take_mapping(),
source_port: port_info.self_id,
target_port: port_info.peer_id,
};
}
fn create_sync_header(&self) -> MessageSyncHeader {
return MessageSyncHeader{
sync_round: self.round,
};
}
fn take_mapping(&mut self) -> u32 {
let mapping = self.mapping_counter;
self.mapping_counter += 1;
return mapping;
}
}
|