Changeset - b87f53268b1b
[Not reviewed]
0 3 0
MH - 3 years ago 2022-03-04 18:50:51
contact@maxhenger.nl
Fix issue related to ambiguous port mapping
3 files changed with 90 insertions and 29 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -61,11 +61,13 @@ pub struct DataMessage {
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub enum PortAnnotationKind {
 
    Getter(PortAnnotationGetter),
 
    Putter(PortAnnotationPutter),
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationGetter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
@@ -73,6 +75,7 @@ pub struct PortAnnotationGetter {
 
    pub peer_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationPutter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -103,6 +103,17 @@ pub(crate) enum Mode {
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl Mode {
 
    fn is_in_sync_block(&self) -> bool {
 
        use Mode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 
}
 

	
 
struct SelectCase {
 
    involved_ports: Vec<LocalPortHandle>,
 
}
 
@@ -453,6 +464,11 @@ impl CompPDL {
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_new_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 
@@ -534,6 +550,11 @@ impl CompPDL {
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Whatever we do, glean information from headers in message
 
        if self.mode.is_in_sync_block() {
 
            self.consensus.handle_new_data_message(comp_ctx, &message);
 
        }
 

	
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -379,7 +379,7 @@ impl Consensus {
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    pub(crate) fn handle_new_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: &DataMessage) -> bool {
 
    pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
@@ -390,7 +390,8 @@ impl Consensus {
 
            )
 
        );
 
        annotation.peer_comp_id = message.sync_header.sending_id;
 
        annotation.
 
        annotation.peer_port_id = message.data_header.source_port;
 
        annotation.peer_discovered = true;
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
@@ -402,9 +403,41 @@ impl Consensus {
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
            let got_annotation = self.get_annotation(*expected_id);
 
            if got_annotation != *expected_annotation {
 
        for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping {
 
            // Determine our annotation, in order to do so we need to find the
 
            // port matching the peer ports
 
            let mut self_annotation = None;
 
            let mut self_annotation_found = false;
 
            match peer_port_kind {
 
                PortAnnotationKind::Putter(peer_port) => {
 
                    for self_port in &self.ports {
 
                        if self_port.kind == PortKind::Getter &&
 
                            self_port.peer_discovered &&
 
                            self_port.peer_comp_id == peer_port.self_comp_id &&
 
                            self_port.peer_port_id == peer_port.self_port_id
 
                        {
 
                            self_annotation = self_port.mapping;
 
                            self_annotation_found = true;
 
                            break;
 
                        }
 
                    }
 
                },
 
                PortAnnotationKind::Getter(peer_port) => {
 
                    if peer_port.peer_comp_id != comp_ctx.id {
 
                        // Peer indicates that we talked to it
 
                        let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id);
 
                        let self_port_index = comp_ctx.get_port_index(self_port_handle);
 
                        self_annotation = self.ports[self_port_index].mapping;
 
                        self_annotation_found = true;
 
                    }
 
                }
 
            }
 

	
 
            if !self_annotation_found {
 
                continue
 
            }
 

	
 
            if self_annotation != *expected_annotation {
 
                return false;
 
            }
 
        }
 
@@ -488,22 +521,16 @@ impl Consensus {
 
        } // else: exactly equal
 
    }
 

	
 
    fn get_annotation(&self, port_id: PortId) -> Option<u32> {
 
        for annotation in self.ports.iter() {
 
            if annotation.self_port_id == port_id {
 
                return annotation.mapping;
 
            }
 
        }
 

	
 
        debug_assert!(false);
 
        return None;
 
    }
 

	
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.self_port_id == data_header.target_port {
 
                annotation.peer_comp_id = source_comp_id;
 
                annotation.peer_port_id = data_header.source_port;
 
                // Message should have already passed the `handle_new_data_message` function, so we
 
                // should have already annotated the peer of the port.
 
                debug_assert!(
 
                    annotation.peer_discovered &&
 
                    annotation.peer_comp_id == source_comp_id &&
 
                    annotation.peer_port_id == data_header.source_port
 
                );
 
                annotation.mapping = Some(data_header.new_mapping);
 
            }
 
        }
 
@@ -624,18 +651,28 @@ impl Consensus {
 
                port_index = index; // remember for later updating
 
            }
 

	
 
            // Add all of the
 
            let annotation_kind = match port.kind {
 
                PortKind::Putter => PortAnnotationKind::Putter(PortAnnotationPutter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id
 
                }),
 
                PortKind::Getter => PortAnnotationKind::Getter(PortAnnotationGetter{
 
                    self_comp_id: port.self_comp_id,
 
                    self_port_id: port.self_port_id,
 
                    peer_comp_id:
 
                })
 
            }
 
            expected_mapping.push((port.self_port_id, port.mapping));
 
                PortKind::Putter => {
 
                    PortAnnotationKind::Putter(PortAnnotationPutter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id
 
                    })
 
                },
 
                PortKind::Getter => {
 
                    if !port.peer_discovered {
 
                        continue;
 
                    }
 

	
 
                    PortAnnotationKind::Getter(PortAnnotationGetter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                    })
 
                }
 
            };
 
            expected_mapping.push((annotation_kind, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
0 comments (0 inline, 0 general)