diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 666ecf3c637103d1c7646b01252373c244637536..b782ef0bf1d7ee20baaa589eb2b6aceaa3e1b2c6 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -10,17 +10,21 @@ pub struct PortAnnotation { self_port_id: PortId, peer_comp_id: CompId, // only valid for getter ports peer_port_id: PortId, // only valid for getter ports + peer_discovered: bool, // only valid for getter ports mapping: Option, + kind: PortKind, } impl PortAnnotation { - fn new(comp_id: CompId, port_id: PortId) -> Self { + fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self { return Self{ self_comp_id: comp_id, self_port_id: port_id, peer_comp_id: CompId::new_invalid(), peer_port_id: PortId::new_invalid(), - mapping: None + peer_discovered: false, + mapping: None, + kind, } } } @@ -75,7 +79,6 @@ impl SolutionCombiner { let channel_index = if entry.getter.is_some() && entry.putter.is_some() { let channel_index = self.solution.channel_mapping.len(); self.solution.channel_mapping.push(entry); - self.matched_channels += 1; channel_index } else if let Some(putter) = entry.putter { @@ -274,7 +277,37 @@ impl Consensus { self.highest_id = comp_ctx.id; self.mapping_counter = 0; self.mode = Mode::SyncBusy; - self.make_ports_consistent_with_ctx(comp_ctx); + + // Make the internally stored port annotation array consistent with the + // ports that the component currently owns. They should match by index + // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`). + let mut needs_setting_ports = false; + if comp_ctx.num_ports() != self.ports.len() { + needs_setting_ports = true; + } else { + for (idx, port) in comp_ctx.iter_ports().enumerate() { + let comp_port_id = port.self_id; + let cons_port_id = self.ports[idx].self_port_id; + if comp_port_id != cons_port_id { + needs_setting_ports = true; + break; + } + } + } + + if needs_setting_ports { + // Reset all ports + self.ports.clear(); + self.ports.reserve(comp_ctx.num_ports()); + for port in comp_ctx.iter_ports() { + self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind)); + } + } else { + // Make sure that we consider all peers as undiscovered again + for annotation in self.ports.iter_mut() { + annotation.peer_discovered = false; + } + } } /// Notifies the consensus management that the PDL code has reached the end @@ -328,34 +361,11 @@ impl Consensus { self.solution.clear(); } - fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) { - let mut needs_setting_ports = false; - if comp_ctx.num_ports() != self.ports.len() { - needs_setting_ports = true; - } else { - for (idx, port) in comp_ctx.iter_ports().enumerate() { - let comp_port_id = port.self_id; - let cons_port_id = self.ports[idx].self_port_id; - if comp_port_id != cons_port_id { - needs_setting_ports = true; - break; - } - } - } - - if needs_setting_ports { - self.ports.clear(); - self.ports.reserve(comp_ctx.num_ports()); - for port in comp_ctx.iter_ports() { - self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id)) - } - } - } - // ------------------------------------------------------------------------- // Handling inbound and outbound messages // ------------------------------------------------------------------------- + /// Prepares a set of values to be sent of a channel. pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id)); @@ -365,6 +375,24 @@ impl Consensus { return DataMessage{ data_header, sync_header, content }; } + /// Handles the arrival of a new data message (needs to be called for every + /// new data message, even though it might not end up being received). This + /// is used to determine peers of `get`ter ports. + pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { + let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let target_index = comp_ctx.get_port_index(target_handle); + let annotation = &mut self.ports[target_index]; + debug_assert!( + !annotation.peer_discovered || ( + annotation.peer_comp_id == message.sync_header.sending_id && + annotation.peer_port_id == message.data_header.source_port + ) + ); + annotation.peer_comp_id = message.sync_header.sending_id; + annotation.peer_port_id = message.data_header.source_port; + annotation.peer_discovered = true; + } + /// Checks if the data message can be received (due to port annotations), if /// it can then `true` is returned and the caller is responsible for handing /// the message of to the PDL code. Otherwise the message cannot be @@ -374,9 +402,41 @@ impl Consensus { debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping - for (expected_id, expected_annotation) in &message.data_header.expected_mapping { - let got_annotation = self.get_annotation(*expected_id); - if got_annotation != *expected_annotation { + for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping { + // Determine our annotation, in order to do so we need to find the + // port matching the peer ports + let mut self_annotation = None; + let mut self_annotation_found = false; + match peer_port_kind { + PortAnnotationKind::Putter(peer_port) => { + for self_port in &self.ports { + if self_port.kind == PortKind::Getter && + self_port.peer_discovered && + self_port.peer_comp_id == peer_port.self_comp_id && + self_port.peer_port_id == peer_port.self_port_id + { + self_annotation = self_port.mapping; + self_annotation_found = true; + break; + } + } + }, + PortAnnotationKind::Getter(peer_port) => { + if peer_port.peer_comp_id == comp_ctx.id { + // Peer indicates that we talked to it + let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id); + let self_port_index = comp_ctx.get_port_index(self_port_handle); + self_annotation = self.ports[self_port_index].mapping; + self_annotation_found = true; + } + } + } + + if !self_annotation_found { + continue + } + + if self_annotation != *expected_annotation { return false; } } @@ -460,22 +520,16 @@ impl Consensus { } // else: exactly equal } - fn get_annotation(&self, port_id: PortId) -> Option { - for annotation in self.ports.iter() { - if annotation.self_port_id == port_id { - return annotation.mapping; - } - } - - debug_assert!(false); - return None; - } - fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) { for annotation in self.ports.iter_mut() { if annotation.self_port_id == data_header.target_port { - annotation.peer_comp_id = source_comp_id; - annotation.peer_port_id = data_header.source_port; + // Message should have already passed the `handle_new_data_message` function, so we + // should have already annotated the peer of the port. + debug_assert!( + annotation.peer_discovered && + annotation.peer_comp_id == source_comp_id && + annotation.peer_port_id == data_header.source_port + ); annotation.mapping = Some(data_header.new_mapping); } } @@ -593,9 +647,31 @@ impl Consensus { let mut port_index = usize::MAX; for (index, port) in self.ports.iter().enumerate() { if port.self_port_id == port_info.self_id { - port_index = index; + port_index = index; // remember for later updating } - expected_mapping.push((port.self_port_id, port.mapping)); + + // Add all of the + let annotation_kind = match port.kind { + PortKind::Putter => { + PortAnnotationKind::Putter(PortAnnotationPutter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id + }) + }, + PortKind::Getter => { + if !port.peer_discovered { + continue; + } + + PortAnnotationKind::Getter(PortAnnotationGetter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + }) + } + }; + expected_mapping.push((annotation_kind, port.mapping)); } let new_mapping = self.take_mapping();