From b87f53268b1bb433e4a8dd8126b2dde0f3050a76 2022-03-04 18:50:51 From: MH Date: 2022-03-04 18:50:51 Subject: [PATCH] Fix issue related to ambiguous port mapping --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 4a50333075fba025712e1fc6790f61b93c6035de..c092fc99311d9c12835221b9707fe1e20b6827be 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -61,11 +61,13 @@ pub struct DataMessage { pub content: ValueGroup, } +#[derive(Debug)] pub enum PortAnnotationKind { Getter(PortAnnotationGetter), Putter(PortAnnotationPutter), } +#[derive(Debug)] pub struct PortAnnotationGetter { pub self_comp_id: CompId, pub self_port_id: PortId, @@ -73,6 +75,7 @@ pub struct PortAnnotationGetter { pub peer_port_id: PortId, } +#[derive(Debug)] pub struct PortAnnotationPutter { pub self_comp_id: CompId, pub self_port_id: PortId, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 14fd2fb0dd6670acda349042897d5b70743c1c47..a73582435ecaeecf86030c59d95f085e71cff9b1 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -103,6 +103,17 @@ pub(crate) enum Mode { Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } +impl Mode { + fn is_in_sync_block(&self) -> bool { + use Mode::*; + + match self { + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + NonSync | StartExit | BusyExit | Exit => false, + } + } +} + struct SelectCase { involved_ports: Vec, } @@ -453,6 +464,11 @@ impl CompPDL { fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); + for message in self.inbox_main.iter() { + if let Some(message) = message { + self.consensus.handle_new_data_message(comp_ctx, message); + } + } debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } @@ -534,6 +550,11 @@ impl CompPDL { /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + // Whatever we do, glean information from headers in message + if self.mode.is_in_sync_block() { + self.consensus.handle_new_data_message(comp_ctx, &message); + } + // Check if we can insert it directly into the storage associated with // the port let target_port_id = message.data_header.target_port; diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 6f4930fdcc955c5f88d4377e728fa50d125288b1..d38f63cb61243b5d8836110d88536dfdabf6d9b8 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -379,7 +379,7 @@ impl Consensus { /// Handles the arrival of a new data message (needs to be called for every /// new data message, even though it might not end up being received). This /// is used to determine peers of `get`ter ports. - pub(crate) fn handle_new_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: &DataMessage) -> bool { + pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); let annotation = &mut self.ports[target_index]; @@ -390,7 +390,8 @@ impl Consensus { ) ); annotation.peer_comp_id = message.sync_header.sending_id; - annotation. + annotation.peer_port_id = message.data_header.source_port; + annotation.peer_discovered = true; } /// Checks if the data message can be received (due to port annotations), if @@ -402,9 +403,41 @@ impl Consensus { debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping - for (expected_id, expected_annotation) in &message.data_header.expected_mapping { - let got_annotation = self.get_annotation(*expected_id); - if got_annotation != *expected_annotation { + for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping { + // Determine our annotation, in order to do so we need to find the + // port matching the peer ports + let mut self_annotation = None; + let mut self_annotation_found = false; + match peer_port_kind { + PortAnnotationKind::Putter(peer_port) => { + for self_port in &self.ports { + if self_port.kind == PortKind::Getter && + self_port.peer_discovered && + self_port.peer_comp_id == peer_port.self_comp_id && + self_port.peer_port_id == peer_port.self_port_id + { + self_annotation = self_port.mapping; + self_annotation_found = true; + break; + } + } + }, + PortAnnotationKind::Getter(peer_port) => { + if peer_port.peer_comp_id != comp_ctx.id { + // Peer indicates that we talked to it + let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id); + let self_port_index = comp_ctx.get_port_index(self_port_handle); + self_annotation = self.ports[self_port_index].mapping; + self_annotation_found = true; + } + } + } + + if !self_annotation_found { + continue + } + + if self_annotation != *expected_annotation { return false; } } @@ -488,22 +521,16 @@ impl Consensus { } // else: exactly equal } - fn get_annotation(&self, port_id: PortId) -> Option { - for annotation in self.ports.iter() { - if annotation.self_port_id == port_id { - return annotation.mapping; - } - } - - debug_assert!(false); - return None; - } - fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) { for annotation in self.ports.iter_mut() { if annotation.self_port_id == data_header.target_port { - annotation.peer_comp_id = source_comp_id; - annotation.peer_port_id = data_header.source_port; + // Message should have already passed the `handle_new_data_message` function, so we + // should have already annotated the peer of the port. + debug_assert!( + annotation.peer_discovered && + annotation.peer_comp_id == source_comp_id && + annotation.peer_port_id == data_header.source_port + ); annotation.mapping = Some(data_header.new_mapping); } } @@ -624,18 +651,28 @@ impl Consensus { port_index = index; // remember for later updating } + // Add all of the let annotation_kind = match port.kind { - PortKind::Putter => PortAnnotationKind::Putter(PortAnnotationPutter{ - self_comp_id: port.self_comp_id, - self_port_id: port.self_port_id - }), - PortKind::Getter => PortAnnotationKind::Getter(PortAnnotationGetter{ - self_comp_id: port.self_comp_id, - self_port_id: port.self_port_id, - peer_comp_id: - }) - } - expected_mapping.push((port.self_port_id, port.mapping)); + PortKind::Putter => { + PortAnnotationKind::Putter(PortAnnotationPutter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id + }) + }, + PortKind::Getter => { + if !port.peer_discovered { + continue; + } + + PortAnnotationKind::Getter(PortAnnotationGetter{ + self_comp_id: port.self_comp_id, + self_port_id: port.self_port_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + }) + } + }; + expected_mapping.push((annotation_kind, port.mapping)); } let new_mapping = self.take_mapping();