From febea00022c749fd23ff72b2706ccf1f6516e23f 2022-03-04 16:35:47 From: mh Date: 2022-03-04 16:35:47 Subject: [PATCH] WIP: Start fixing consensus issues --- diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index e0241691fdf1da8af32096d323bfbe981f1032a7..6f4930fdcc955c5f88d4377e728fa50d125288b1 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -10,6 +10,7 @@ pub struct PortAnnotation { self_port_id: PortId, peer_comp_id: CompId, // only valid for getter ports peer_port_id: PortId, // only valid for getter ports + peer_discovered: bool, // only valid for getter ports mapping: Option, kind: PortKind, } @@ -21,6 +22,7 @@ impl PortAnnotation { self_port_id: port_id, peer_comp_id: CompId::new_invalid(), peer_port_id: PortId::new_invalid(), + peer_discovered: false, mapping: None, kind, } @@ -276,7 +278,37 @@ impl Consensus { self.highest_id = comp_ctx.id; self.mapping_counter = 0; self.mode = Mode::SyncBusy; - self.make_ports_consistent_with_ctx(comp_ctx); + + // Make the internally stored port annotation array consistent with the + // ports that the component currently owns. They should match by index + // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`). + let mut needs_setting_ports = false; + if comp_ctx.num_ports() != self.ports.len() { + needs_setting_ports = true; + } else { + for (idx, port) in comp_ctx.iter_ports().enumerate() { + let comp_port_id = port.self_id; + let cons_port_id = self.ports[idx].self_port_id; + if comp_port_id != cons_port_id { + needs_setting_ports = true; + break; + } + } + } + + if needs_setting_ports { + // Reset all ports + self.ports.clear(); + self.ports.reserve(comp_ctx.num_ports()); + for port in comp_ctx.iter_ports() { + self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind)); + } + } else { + // Make sure that we consider all peers as undiscovered again + for annotation in self.ports.iter_mut() { + annotation.peer_discovered = false; + } + } } /// Notifies the consensus management that the PDL code has reached the end @@ -330,34 +362,11 @@ impl Consensus { self.solution.clear(); } - fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) { - let mut needs_setting_ports = false; - if comp_ctx.num_ports() != self.ports.len() { - needs_setting_ports = true; - } else { - for (idx, port) in comp_ctx.iter_ports().enumerate() { - let comp_port_id = port.self_id; - let cons_port_id = self.ports[idx].self_port_id; - if comp_port_id != cons_port_id { - needs_setting_ports = true; - break; - } - } - } - - if needs_setting_ports { - self.ports.clear(); - self.ports.reserve(comp_ctx.num_ports()); - for port in comp_ctx.iter_ports() { - self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind)); - } - } - } - // ------------------------------------------------------------------------- // Handling inbound and outbound messages // ------------------------------------------------------------------------- + /// Prepares a set of values to be sent of a channel. pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id)); @@ -367,6 +376,23 @@ impl Consensus { return DataMessage{ data_header, sync_header, content }; } + /// Handles the arrival of a new data message (needs to be called for every + /// new data message, even though it might not end up being received). This + /// is used to determine peers of `get`ter ports. + pub(crate) fn handle_new_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: &DataMessage) -> bool { + let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let target_index = comp_ctx.get_port_index(target_handle); + let annotation = &mut self.ports[target_index]; + debug_assert!( + !annotation.peer_discovered || ( + annotation.peer_comp_id == message.sync_header.sending_id && + annotation.peer_port_id == message.data_header.source_port + ) + ); + annotation.peer_comp_id = message.sync_header.sending_id; + annotation. + } + /// Checks if the data message can be received (due to port annotations), if /// it can then `true` is returned and the caller is responsible for handing /// the message of to the PDL code. Otherwise the message cannot be