Changeset - af328ac5eadf
[Not reviewed]
0 3 0
mh - 3 years ago 2022-01-26 15:06:00
contact@maxhenger.nl
WIP: Only trust getter port mapping in consensus
3 files changed with 206 insertions and 155 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -36,12 +36,13 @@ pub enum PortState {
 
    Closed,
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId,
 
    pub peer_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_comp_id: CompId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
}
 

	
 
pub struct Channel {
 
@@ -122,7 +123,6 @@ pub enum SyncRoundDecision {
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub matched_channels: usize,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
@@ -130,7 +130,6 @@ impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            channel_mapping: Vec::new(),
 
            matched_channels: 0,
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -63,7 +63,7 @@ impl CompCtx {
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Closed,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 

	
 
@@ -341,8 +341,8 @@ impl CompPDL {
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
                let scheduling = self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(scheduling.unwrap_or(CompScheduling::Immediate));
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
@@ -427,18 +427,44 @@ impl CompPDL {
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option<CompScheduling> {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision)
 
    }
 

	
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option<CompScheduling> {
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        self.mode = Mode::SyncEnd;
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return None;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
            return None;
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.mode = Mode::Exit;
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            return Some(CompScheduling::Exit);
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync
 

	
 
        // Note: for now we have that the scheduler handles exiting. I don't
 
@@ -637,8 +663,11 @@ impl CompPDL {
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                let old_peer_comp_id = port_info.peer_comp_id;
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
                comp_ctx.add_peer(sched_ctx, new_comp_id, None);
 
                comp_ctx.remove_peer(sched_ctx, old_peer_comp_id);
 
                self.unblock_local_port(sched_ctx, comp_ctx, port_id);
 
            }
 
        }
 
@@ -646,28 +675,7 @@ impl CompPDL {
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option<CompScheduling> {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return None;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a conclusion
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        self.mode = Mode::NonSync;
 

	
 
        if is_success {
 
            // We can simply continue executing. So we do nothing extra!
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            return Some(CompScheduling::Exit);
 
        }
 

	
 
        return None;
 
        return self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    // -------------------------------------------------------------------------
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -6,13 +6,22 @@ use crate::runtime2::communication::*;
 
use super::component_pdl::*;
 

	
 
pub struct PortAnnotation {
 
    id: PortId,
 
    self_comp_id: CompId,
 
    self_port_id: PortId,
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    mapping: Option<u32>,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(id: PortId) -> Self {
 
        return Self{ id, mapping: None }
 
    fn new(comp_id: CompId, port_id: PortId) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            mapping: None
 
        }
 
    }
 
}
 

	
 
@@ -25,14 +34,14 @@ enum Mode {
 

	
 
struct SolutionCombiner {
 
    solution: SyncPartialSolution,
 
    all_present: bool, // set if the `submissions_by` only contains (_, true) entries.
 
    matched_channels: usize,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution::default(),
 
            all_present: false,
 
            matched_channels: 0,
 
        }
 
    }
 

	
 
@@ -44,7 +53,7 @@ impl SolutionCombiner {
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> SyncRoundDecision {
 
        if self.all_present {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            debug_assert_ne!(self.solution.decision, SyncRoundDecision::None);
 
            return self.solution.decision;
 
        }
 
@@ -56,83 +65,35 @@ impl SolutionCombiner {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
        // Combine our partial solution with the provided partial solution.
 
        // This algorithm *could* allow overlap in the partial solutions, but
 
        // in practice this means something is going wrong (a component stored
 
        // a local solution *and* transmitted it to the leader, then later
 
        // submitted its partial solution), hence we will do some debug asserts
 
        // for now.
 
        for new_entry in partial.channel_mapping {
 
            let channel_index = if new_entry.putter.is_some() && new_entry.getter.is_some() {
 
                // Channel is completely specified
 
                debug_assert!(
 
                    self.find_channel_index_for_partial_entry(new_entry.putter.as_ref().unwrap()).is_none() &&
 
                    self.find_channel_index_for_partial_entry(new_entry.getter.as_ref().unwrap()).is_none()
 
                );
 
        if partial.decision == SyncRoundDecision::Failure {
 
            self.solution.decision = SyncRoundDecision::Failure;
 
        }
 

	
 
        for entry in partial.channel_mapping {
 
            let channel_index = if entry.getter.is_some() && entry.putter.is_some() {
 
                let channel_index = self.solution.channel_mapping.len();
 
                self.solution.channel_mapping.push(new_entry);
 
                self.solution.channel_mapping.push(entry);
 
                self.matched_channels += 1;
 

	
 
                channel_index
 
            } else if let Some(new_port) = new_entry.putter {
 
                // Only putter is present in new entry
 
                match self.find_channel_index_for_partial_entry(&new_port) {
 
                    Some(channel_index) => {
 
                        let entry = &mut self.solution.channel_mapping[channel_index];
 
                        debug_assert!(entry.putter.is_none());
 
                        entry.putter = Some(new_port);
 

	
 
                        channel_index
 
                    },
 
                    None => {
 
                        let channel_index = self.solution.channel_mapping.len();
 
                        self.solution.channel_mapping.push(SyncSolutionChannel{
 
                            putter: Some(new_port),
 
                            getter: None,
 
                        });
 

	
 
                        channel_index
 
                    }
 
                }
 
            } else if let Some(new_port) = new_entry.getter {
 
                // Only getter is present in new entry
 
                match self.find_channel_index_for_partial_entry(&new_port) {
 
                    Some(channel_index) => {
 
                        let entry = &mut self.solution.channel_mapping[channel_index];
 
                        debug_assert!(entry.getter.is_none());
 
                        entry.getter = Some(new_port);
 

	
 
                        channel_index
 
                    },
 
                    None => {
 
                        let channel_index = self.solution.channel_mapping.len();
 
                        self.solution.channel_mapping.push(SyncSolutionChannel{
 
                            putter: None,
 
                            getter: Some(new_port)
 
                        });
 

	
 
                        channel_index
 
                    }
 
                }
 
            } else if let Some(putter) = entry.putter {
 
                self.combine_with_putter_port(putter)
 
            } else if let Some(getter) = entry.getter {
 
                self.combine_with_getter_port(getter)
 
            } else {
 
                unreachable!()
 
                unreachable!(); // both putter and getter are None
 
            };
 

	
 
            // Make sure the new entry is consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if !Self::channel_is_consistent(channel) {
 
                self.solution.decision = SyncRoundDecision::Failure;
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        // Check to see if we have a global solution already
 
        self.update_all_present();
 
        if self.all_present && self.solution.decision != SyncRoundDecision::Failure {
 
            debug_assert_eq!(self.solution.decision, SyncRoundDecision::None);
 
            dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
            self.solution.decision = SyncRoundDecision::Solution;
 
        }
 
        self.update_solution();
 
    }
 

	
 
    /// Combines the currently stored global solution (if any) with the newly
 
@@ -143,29 +104,28 @@ impl SolutionCombiner {
 

	
 
        // Combine partial solution with the local solution entries
 
        for entry in solution {
 
            match entry {
 
            // Match the current entry up with its peer endpoint, or add a new
 
            // entry.
 
            let channel_index = match entry {
 
                SyncLocalSolutionEntry::Putter(putter) => {
 

	
 
                    self.combine_with_putter_port(putter)
 
                },
 
                SyncLocalSolutionEntry::Getter(getter) => {
 
                    self.combine_with_getter_port(getter)
 
                }
 
            };
 

	
 
            // Check if channel is now consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        if !had_new_entry {
 
            self.update_all_present();
 
            if self.all_present && self.solution.decision != SyncRoundDecision::Failure {
 
                // No new entries and every component is present. This implies that
 
                // every component successfully added their local solutions to the
 
                // global solution. Hence: we have a global solution
 
                debug_assert_eq!(self.solution.decision, SyncRoundDecision::None);
 
                dbg_code!(for entry in &self.solution.channel_mapping {
 
                    debug_assert!(entry.putter.is_some() && entry.getter.is_some());
 
                });
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
        self.update_solution();
 
    }
 

	
 
    /// Takes whatever partial solution is present in the solution combiner and
 
@@ -175,34 +135,102 @@ impl SolutionCombiner {
 
    fn take_partial_solution(&mut self) -> SyncPartialSolution {
 
        let mut partial_solution = SyncPartialSolution::default();
 
        std::mem::swap(&mut partial_solution, &mut self.solution);
 
        self.all_present = false;
 
        self.clear();
 

	
 
        return partial_solution;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.solution.submissions_by.clear();
 
        self.solution.channel_mapping.clear();
 
        self.solution.decision = SyncRoundDecision::None;
 
        self.matched_channels = 0;
 
    }
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn update_all_present(&mut self) {
 
        debug_assert!(!self.all_present); // upheld by caller
 
        for (_, present) in self.solution.submissions_by.iter() {
 
            if !*present {
 
                return;
 
            }
 
    fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.putter.is_none());
 
            channel.putter = Some(putter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: Some(putter),
 
                getter: None,
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.getter.is_none());
 
            channel.getter = Some(getter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: None,
 
                getter: Some(getter)
 
            });
 

	
 
        self.all_present = true;
 
            return channel_index;
 
        }
 
    }
 

	
 
    /// Retrieve channel index based on a putter port
 
    /// Retrieve index of the channel containing a getter port that has received
 
    /// from the specified putter port.
 
    fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option<usize> {
 
        for entry in self.solution.channel_mapping.iter() {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(getter) = &channel.getter {
 
                if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Retrieve index of the channel for a getter port. To find this channel
 
    /// the **peer** component/port IDs of the getter port are used.
 
    fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(putter) = &channel.putter {
 
                if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option<bool> {
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            return None;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(putter.mapping == getter.mapping);
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
    /// local solutions.
 
    fn update_solution(&mut self) {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            if self.solution.decision != SyncRoundDecision::Failure {
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
    }
 
}
 
@@ -258,14 +286,22 @@ impl Consensus {
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_info = comp_ctx.get_port(port.id);
 
                local_solution.push(SyncLocalSolutionEntry {
 
                    self_port_id: port.id,
 
                    peer_comp_id: port_info.peer_comp_id,
 
                    peer_port_id: port_info.peer_id,
 
                    mapping,
 
                    port_kind: port_info.kind,
 
                });
 
                let port_info = comp_ctx.get_port(port.self_port_id);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
@@ -296,7 +332,7 @@ impl Consensus {
 
        } else {
 
            for idx in 0..comp_ctx.ports.len() {
 
                let comp_port_id = comp_ctx.ports[idx].self_id;
 
                let cons_port_id = self.ports[idx].id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
@@ -308,7 +344,7 @@ impl Consensus {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.ports.len());
 
            for port in &comp_ctx.ports {
 
                self.ports.push(PortAnnotation::new(port.self_id))
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
            }
 
        }
 
    }
 
@@ -319,7 +355,7 @@ impl Consensus {
 

	
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id));
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
@@ -332,7 +368,7 @@ impl Consensus {
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.ports.iter().any(|v| v.id == message.data_header.target_port));
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
@@ -343,7 +379,7 @@ impl Consensus {
 
        }
 

	
 
        // Expected mapping matches current mapping, so we will receive the message
 
        self.set_annotation(message.data_header.target_port, message.data_header.new_mapping);
 
        self.set_annotation(message.sync_header.sending_id, &message.data_header);
 

	
 
        // Handle the sync header embedded within the data message
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 
@@ -369,7 +405,6 @@ impl Consensus {
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                todo!("clear port mapping or something");
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
@@ -410,7 +445,7 @@ impl Consensus {
 

	
 
    fn get_annotation(&self, port_id: PortId) -> Option<u32> {
 
        for annotation in self.ports.iter() {
 
            if annotation.id == port_id {
 
            if annotation.self_port_id == port_id {
 
                return annotation.mapping;
 
            }
 
        }
 
@@ -419,10 +454,12 @@ impl Consensus {
 
        return None;
 
    }
 

	
 
    fn set_annotation(&mut self, port_id: PortId, mapping: u32) {
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.id == port_id {
 
                annotation.mapping = Some(mapping);
 
            if annotation.self_port_id == data_header.target_port {
 
                annotation.peer_comp_id = source_comp_id;
 
                annotation.peer_port_id = data_header.source_port;
 
                annotation.mapping = Some(data_header.new_mapping);
 
            }
 
        }
 
    }
 
@@ -496,12 +533,19 @@ impl Consensus {
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        for (peer, _) in self.solution.solution.submissions_by.iter().copied() {
 
            if peer == comp_ctx.id {
 
                // Do not send to ourselves
 
                continue;
 
        let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance
 

	
 
        for channel in self.solution.solution.channel_mapping.iter() {
 
            let getter = channel.getter.as_ref().unwrap();
 
            if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) {
 
                peers.push(getter.self_comp_id);
 
            }
 
            if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) {
 
                peers.push(getter.peer_comp_id);
 
            }
 
        }
 

	
 
        for peer in peers {
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
@@ -532,10 +576,10 @@ impl Consensus {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.id == port_info.self_id {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.id, port.mapping));
 
            expected_mapping.push((port.self_port_id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
0 comments (0 inline, 0 general)