Files @ 68411f4b8014
Branch filter:

Location: CSY/reowolf/src/runtime2/consensus.rs

68411f4b8014 40.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Round of cleanup on temporary type names and old code
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
use crate::collections::VecSet;

use crate::protocol::eval::ValueGroup;

use super::branch::{BranchId, ExecTree, QueueKind};
use super::ConnectorId;
use super::port::{ChannelId, PortIdLocal};
use super::inbox::{
    Message, PortAnnotation,
    DataMessage, DataContent, DataHeader,
    SyncMessage, SyncContent, SyncHeader,
};
use super::scheduler::ComponentCtx;

struct BranchAnnotation {
    port_mapping: Vec<PortAnnotation>,
}

#[derive(Debug)]
pub(crate) struct LocalSolution {
    component: ConnectorId,
    final_branch_id: BranchId,
    port_mapping: Vec<(ChannelId, BranchId)>,
}

#[derive(Debug, Clone)]
pub(crate) struct GlobalSolution {
    component_branches: Vec<(ConnectorId, BranchId)>,
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
}

// -----------------------------------------------------------------------------
// Consensus
// -----------------------------------------------------------------------------

/// The consensus algorithm. Currently only implemented to find the component
/// with the highest ID within the sync region and letting it handle all the
/// local solutions.
///
/// The type itself serves as an experiment to see how code should be organized.
// TODO: Flatten all datastructures
// TODO: Have a "branch+port position hint" in case multiple operations are
//  performed on the same port to prevent repeated lookups
// TODO: A lot of stuff should be batched. Like checking all the sync headers
//  and sending "I have a higher ID" messages.
pub(crate) struct Consensus {
    // --- State that is cleared after each round
    // Local component's state
    highest_connector_id: ConnectorId,
    branch_annotations: Vec<BranchAnnotation>,
    last_finished_handled: Option<BranchId>,
    // Gathered state from communication
    encountered_peers: VecSet<ConnectorId>, // to determine when we should send "found a higher ID" messages.
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
    solution_combiner: SolutionCombiner,
    // --- Persistent state
    // TODO: Tracking sync round numbers
    // --- Workspaces
    workspace_ports: Vec<PortIdLocal>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Consistency {
    Valid,
    Inconsistent,
}

impl Consensus {
    pub fn new() -> Self {
        return Self {
            highest_connector_id: ConnectorId::new_invalid(),
            branch_annotations: Vec::new(),
            last_finished_handled: None,
            encountered_peers: VecSet::new(),
            encountered_ports: VecSet::new(),
            solution_combiner: SolutionCombiner::new(),
            workspace_ports: Vec::new(),
        }
    }

    // --- Controlling sync round and branches

    /// Returns whether the consensus algorithm is running in sync mode
    pub fn is_in_sync(&self) -> bool {
        return !self.branch_annotations.is_empty();
    }

    /// TODO: Remove this once multi-fire is in place
    pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation {
        let branch = &self.branch_annotations[branch_id.index as usize];
        let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap();
        return port;
    }

    /// Sets up the consensus algorithm for a new synchronous round. The
    /// provided ports should be the ports the component owns at the start of
    /// the sync round.
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
        debug_assert!(!self.highest_connector_id.is_valid());
        debug_assert!(self.branch_annotations.is_empty());
        debug_assert!(self.last_finished_handled.is_none());
        debug_assert!(self.encountered_peers.is_empty());
        debug_assert!(self.solution_combiner.local.is_empty());

        // We'll use the first "branch" (the non-sync one) to store our ports,
        // this allows cloning if we created a new branch.
        self.branch_annotations.push(BranchAnnotation{
            port_mapping: ctx.get_ports().iter()
                .map(|v| PortAnnotation{
                    port_id: v.self_id,
                    registered_id: None,
                    expected_firing: None,
                })
                .collect(),
        });

        self.highest_connector_id = ctx.id;

    }

    /// Notifies the consensus algorithm that a new branch has appeared. Must be
    /// called for each forked branch in the execution tree.
    pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) {
        // If called correctly. Then each time we are notified the new branch's
        // index is the length in `branch_annotations`.
        debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize);
        let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize];
        let new_branch_annotations = BranchAnnotation{
            port_mapping: parent_branch_annotations.port_mapping.clone(),
        };
        self.branch_annotations.push(new_branch_annotations);
    }

    /// Notifies the consensus algorithm that a branch has reached the end of
    /// the sync block. A final check for consistency will be performed that the
    /// caller has to handle. Note that
    pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency {
        debug_assert!(self.is_in_sync());
        let branch = &self.branch_annotations[branch_id.index as usize];
        for mapping in &branch.port_mapping {
            match mapping.expected_firing {
                Some(expected) => {
                    if expected != mapping.registered_id.is_some() {
                        // Inconsistent speculative state and actual state
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
                        return Consistency::Inconsistent;
                    }
                },
                None => {},
            }
        }

        return Consistency::Valid;
    }

    /// Notifies the consensus algorithm that a particular branch has assumed
    /// a speculative value for its port mapping.
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency {
        debug_assert!(self.is_in_sync());
        let branch = &mut self.branch_annotations[branch_id.index as usize];
        for mapping in &mut branch.port_mapping {
            if mapping.port_id == port_id {
                match mapping.expected_firing {
                    None => {
                        // Not yet mapped, perform speculative mapping
                        mapping.expected_firing = Some(does_fire);
                        return Consistency::Valid;
                    },
                    Some(current) => {
                        // Already mapped
                        if current == does_fire {
                            return Consistency::Valid;
                        } else {
                            return Consistency::Inconsistent;
                        }
                    }
                }
            }
        }

        unreachable!("notify_of_speculative_mapping called with unowned port");
    }

    /// Generates sync messages for any branches that are at the end of the
    /// sync block. To find these branches, they should've been put in the
    /// "finished" queue in the execution tree.
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option<BranchId> {
        debug_assert!(self.is_in_sync());

        let mut last_branch_id = self.last_finished_handled;
        for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) {
            // Turn the port mapping into a local solution
            let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping;
            let mut target_mapping = Vec::with_capacity(source_mapping.len());

            for port in source_mapping {
                // Note: if the port is silent, and we've never communicated
                // over the port, then we need to do so now, to let the peer
                // component know about our sync leader state.
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
                let peer_port_id = port_desc.peer_id;
                let channel_id = port_desc.channel_id;

                if !self.encountered_ports.contains(&port.port_id) {
                    ctx.submit_message(Message::Data(DataMessage {
                        sync_header: SyncHeader{
                            sending_component_id: ctx.id,
                            highest_component_id: self.highest_connector_id,
                        },
                        data_header: DataHeader{
                            expected_mapping: source_mapping.clone(),
                            sending_port: port.port_id,
                            target_port: peer_port_id,
                            new_mapping: BranchId::new_invalid(),
                        },
                        content: DataContent::SilentPortNotification,
                    }));
                    self.encountered_ports.push(port.port_id);
                }

                target_mapping.push((
                    channel_id,
                    port.registered_id.unwrap_or(BranchId::new_invalid())
                ));
            }

            let local_solution = LocalSolution{
                component: ctx.id,
                final_branch_id: branch.id,
                port_mapping: target_mapping,
            };
            let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
            if solution_branch.is_some() {
                // No need to continue iterating, we've found the solution
                return solution_branch;
            }

            last_branch_id = Some(branch.id);
        }

        self.last_finished_handled = last_branch_id;
        return None;
    }

    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
        debug_assert!(self.is_in_sync());

        // TODO: Handle sending and receiving ports
        // Set final ports
        final_ports.clear();
        let branch = &self.branch_annotations[branch_id.index as usize];
        for port in &branch.port_mapping {
            final_ports.push(port.port_id);
        }

        // Clear out internal storage to defaults
        self.highest_connector_id = ConnectorId::new_invalid();
        self.branch_annotations.clear();
        self.last_finished_handled = None;
        self.encountered_peers.clear();
        self.encountered_ports.clear();
        self.solution_combiner.clear();
    }

    // --- Handling messages

    /// Prepares a message for sending. Caller should have made sure that
    /// sending the message is consistent with the speculative state.
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
        debug_assert!(self.is_in_sync());
        let branch = &mut self.branch_annotations[branch_id.index as usize];

        if cfg!(debug_assertions) {
            // Check for consistent mapping
            let port = branch.port_mapping.iter()
                .find(|v| v.port_id == source_port_id)
                .unwrap();
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
        }

        // Check for ports that are being sent
        debug_assert!(self.workspace_ports.is_empty());
        find_ports_in_value_group(content, &mut self.workspace_ports);
        if !self.workspace_ports.is_empty() {
            todo!("handle sending ports");
            self.workspace_ports.clear();
        }

        // Construct data header
        // TODO: Handle multiple firings. Right now we just assign the current
        //  branch to the `None` value because we know we can only send once.
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
        let data_header = DataHeader{
            expected_mapping: branch.port_mapping.clone(),
            sending_port: port_info.self_id,
            target_port: port_info.peer_id,
            new_mapping: branch_id
        };

        // Update port mapping
        for mapping in &mut branch.port_mapping {
            if mapping.port_id == source_port_id {
                mapping.expected_firing = Some(true);
                mapping.registered_id = Some(branch_id);
            }
        }

        self.encountered_ports.push(source_port_id);

        return (self.create_sync_header(ctx), data_header);
    }

    /// Handles a new data message by handling the data and sync header, and
    /// checking which *existing* branches *can* receive the message. So two
    /// cautionary notes:
    /// 1. A future branch might also be able to receive this message, see the
    ///     `branch_can_receive` function.
    /// 2. We return the branches that *can* receive the message, you still
    ///     have to explicitly call `notify_of_received_message`.
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec<BranchId>) {
        self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids);
        self.handle_received_sync_header(&message.sync_header, ctx);
    }

    /// Handles a new sync message by handling the sync header and the contents
    /// of the message. Returns `Some` with the branch ID of the global solution
    /// if the sync solution has been found.
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option<BranchId> {
        self.handle_received_sync_header(&message.sync_header, ctx);

        // And handle the contents
        debug_assert_eq!(message.target_component_id, ctx.id);
        match message.content {
            SyncContent::Notification => {
                // We were just interested in the header
                return None;
            },
            SyncContent::LocalSolution(solution) => {
                // We might be the leader, or earlier messages caused us to not
                // be the leader anymore.
                return self.send_or_store_local_solution(solution, ctx);
            },
            SyncContent::GlobalSolution(solution) => {
                // Take branch of interest and return it.
                let (_, branch_id) = solution.component_branches.iter()
                    .find(|(connector_id, _)| *connector_id == ctx.id)
                    .unwrap();
                return Some(*branch_id);
            }
        }
    }

    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) {
        debug_assert!(self.branch_can_receive(branch_id, data_header, content));

        let branch = &mut self.branch_annotations[branch_id.index as usize];
        for mapping in &mut branch.port_mapping {
            if mapping.port_id == data_header.target_port {
                // Found the port in which the message should be inserted
                mapping.registered_id = Some(data_header.new_mapping);

                // Check for sent ports
                debug_assert!(self.workspace_ports.is_empty());
                find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports);
                if !self.workspace_ports.is_empty() {
                    todo!("handle received ports");
                    self.workspace_ports.clear();
                }

                return;
            }
        }

        // If here, then the branch didn't actually own the port? Means the
        // caller made a mistake
        unreachable!("incorrect notify_of_received_message");
    }

    /// Matches the mapping between the branch and the data message. If they
    /// match then the branch can receive the message.
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool {
        if let DataContent::SilentPortNotification = content {
            // No port can receive a "silent" notification.
            return false;
        }

        let annotation = &self.branch_annotations[branch_id.index as usize];
        for expected in &data_header.expected_mapping {
            // If we own the port, then we have an entry in the
            // annotation, check if the current mapping matches
            for current in &annotation.port_mapping {
                if expected.port_id == current.port_id {
                    if expected.registered_id != current.registered_id {
                        // IDs do not match, we cannot receive the
                        // message in this branch
                        return false;
                    }
                }
            }
        }

        return true;
    }

    // --- Internal helpers

    /// Checks data header and consults the stored port mapping and the
    /// execution tree to see which branches may receive the data message's
    /// contents.
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
            if branch.awaiting_port == data_header.target_port {
                // Found a branch awaiting the message, but we need to make sure
                // the mapping is correct
                if self.branch_can_receive(branch.id, data_header, content) {
                    target_ids.push(branch.id);
                }
            }
        }
    }

    fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) {
        debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves

        self.encountered_peers.push(sync_header.sending_component_id);

        if sync_header.highest_component_id > self.highest_connector_id {
            // Sender has higher component ID. So should be the target of our
            // messages. We should also let all of our peers know
            self.highest_connector_id = sync_header.highest_component_id;
            for encountered_id in self.encountered_peers.iter() {
                if *encountered_id == sync_header.sending_component_id {
                    // Don't need to send it to this one
                    continue
                }

                let message = SyncMessage {
                    sync_header: self.create_sync_header(ctx),
                    target_component_id: *encountered_id,
                    content: SyncContent::Notification,
                };
                ctx.submit_message(Message::Sync(message));
            }

            // But also send our locally combined solution
            self.forward_local_solutions(ctx);
        } else if sync_header.highest_component_id < self.highest_connector_id {
            // Sender has lower leader ID, so it should know about our higher
            // one.
            let message = SyncMessage {
                sync_header: self.create_sync_header(ctx),
                target_component_id: sync_header.sending_component_id,
                content: SyncContent::Notification
            };
            ctx.submit_message(Message::Sync(message));
        } // else: exactly equal, so do nothing
    }

    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option<BranchId> {
        println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index);

        if self.highest_connector_id == ctx.id {
            // We are the leader
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
                let mut my_final_branch_id = BranchId::new_invalid();
                for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
                    if connector_id == ctx.id {
                        // This is our solution branch
                        my_final_branch_id = branch_id;
                        continue;
                    }

                    let message = SyncMessage {
                        sync_header: self.create_sync_header(ctx),
                        target_component_id: connector_id,
                        content: SyncContent::GlobalSolution(global_solution.clone()),
                    };
                    ctx.submit_message(Message::Sync(message));
                }

                debug_assert!(my_final_branch_id.is_valid());
                return Some(my_final_branch_id);
            } else {
                return None;
            }
        } else {
            // Someone else is the leader
            let message = SyncMessage {
                sync_header: self.create_sync_header(ctx),
                target_component_id: self.highest_connector_id,
                content: SyncContent::LocalSolution(solution),
            };
            ctx.submit_message(Message::Sync(message));
            return None;
        }
    }

    #[inline]
    fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader {
        return SyncHeader{
            sending_component_id: ctx.id,
            highest_component_id: self.highest_connector_id,
        }
    }

    fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) {
        debug_assert_ne!(self.highest_connector_id, ctx.id);

        for local_solution in self.solution_combiner.drain() {
            let message = SyncMessage {
                sync_header: self.create_sync_header(ctx),
                target_component_id: self.highest_connector_id,
                content: SyncContent::LocalSolution(local_solution),
            };
            ctx.submit_message(Message::Sync(message));
        }
    }
}

// -----------------------------------------------------------------------------
// Solution storage and algorithms
// -----------------------------------------------------------------------------

struct MatchedLocalSolution {
    final_branch_id: BranchId,
    channel_mapping: Vec<(ChannelId, BranchId)>,
    matches: Vec<ComponentMatches>,
}

struct ComponentMatches {
    target_id: ConnectorId,
    target_index: usize,
    match_indices: Vec<usize>, // of local solution in connector
}

struct ComponentPeer {
    target_id: ConnectorId,
    target_index: usize, // in array of global solution components
    involved_channels: Vec<ChannelId>,
}

struct ComponentLocalSolutions {
    component: ConnectorId,
    peers: Vec<ComponentPeer>,
    solutions: Vec<MatchedLocalSolution>,
    all_peers_present: bool,
}

// TODO: Flatten? Flatten. Flatten everything.
pub(crate) struct SolutionCombiner {
    local: Vec<ComponentLocalSolutions>
}

impl SolutionCombiner {
    fn new() -> Self {
        return Self{
            local: Vec::new(),
        };
    }

    /// Adds a new local solution to the global solution storage. Will check the
    /// new local solutions for matching against already stored local solutions
    /// of peer connectors.
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
        let component_id = solution.component;
        let solution = MatchedLocalSolution{
            final_branch_id: solution.final_branch_id,
            channel_mapping: solution.port_mapping,
            matches: Vec::new(),
        };

        // Create an entry for the solution for the particular component
        let component_exists = self.local.iter_mut()
            .enumerate()
            .find(|(_, v)| v.component == component_id);
        let (component_index, solution_index, new_component) = match component_exists {
            Some((component_index, storage)) => {
                // Entry for component exists, so add to solutions
                let solution_index = storage.solutions.len();
                storage.solutions.push(solution);

                (component_index, solution_index, false)
            }
            None => {
                // Entry for component does not exist yet
                let component_index = self.local.len();
                self.local.push(ComponentLocalSolutions{
                    component: component_id,
                    peers: Vec::new(),
                    solutions: vec![solution],
                    all_peers_present: false,
                });
                (component_index, 0, true)
            }
        };

        // If this is a solution of a component that is new to us, then we check
        // in the stored solutions which other components are peers of the new
        // one.
        if new_component {
            let cur_ports = &self.local[component_index].solutions[0].channel_mapping;
            let mut component_peers = Vec::new();

            // Find the matching components
            for (other_index, other_component) in self.local.iter().enumerate() {
                if other_index == component_index {
                    // Don't match against ourselves
                    continue;
                }

                let mut matching_channels = Vec::new();
                for (cur_channel_id, _) in cur_ports {
                    for (other_channel_id, _) in &other_component.solutions[0].channel_mapping {
                        if cur_channel_id == other_channel_id {
                            // We have a shared port
                            matching_channels.push(*cur_channel_id);
                        }
                    }
                }

                if !matching_channels.is_empty() {
                    // We share some ports
                    component_peers.push(ComponentPeer{
                        target_id: other_component.component,
                        target_index: other_index,
                        involved_channels: matching_channels,
                    });
                }
            }

            let mut num_ports_in_peers = 0;
            for peer in &component_peers {
                num_ports_in_peers += peer.involved_channels.len();
            }

            if num_ports_in_peers == cur_ports.len() {
                // Newly added component has all required peers present
                self.local[component_index].all_peers_present = true;
            }

            // Add the found component pairing entries to the solution entries
            // for the two involved components
            for component_match in component_peers {
                // Check the other component for having all peers present
                let mut num_ports_in_peers = component_match.involved_channels.len();
                let other_component = &mut self.local[component_match.target_index];
                for existing_peer in &other_component.peers {
                    num_ports_in_peers += existing_peer.involved_channels.len();
                }

                if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() {
                    other_component.all_peers_present = true;
                }

                other_component.peers.push(ComponentPeer{
                    target_id: component_id,
                    target_index: component_index,
                    involved_channels: component_match.involved_channels.clone(),
                });

                let new_component = &mut self.local[component_index];
                new_component.peers.push(component_match);
            }
        }

        // We're now sure that we know which other components the currently
        // considered component is linked up to. Now we need to check those
        // entries (if any) to see if any pair of local solutions match
        let mut new_component_matches = Vec::new();
        let cur_component = &self.local[component_index];
        let cur_solution = &cur_component.solutions[solution_index];

        for peer in &cur_component.peers {
            let mut new_solution_matches = Vec::new();

            let other_component = &self.local[peer.target_index];
            for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() {
                // Check the port mappings between the pair of solutions.
                let mut all_matched = true;

                'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping {
                    for (other_port, other_branch) in &other_solution.channel_mapping {
                        if cur_port == other_port {
                            if cur_branch == other_branch {
                                // Same port mapping, go to next port
                                break;
                            } else {
                                // Different port mapping, not a match
                                all_matched = false;
                                break 'mapping_check_loop;
                            }
                        }
                    }
                }

                if !all_matched {
                    continue;
                }

                // Port mapping between the component pair is the same, so they
                // have agreeable local solutions
                new_solution_matches.push(other_solution_index);
            }

            new_component_matches.push(ComponentMatches{
                target_id: peer.target_id,
                target_index: peer.target_index,
                match_indices: new_solution_matches,
            });
        }

        // And now that we have the new solution-to-solution matches, we need to
        // add those in the appropriate storage.
        for new_component_match in new_component_matches {
            let other_component = &mut self.local[new_component_match.target_index];

            for other_solution_index in new_component_match.match_indices.iter().copied() {
                let other_solution = &mut other_component.solutions[other_solution_index];

                // Add a completely new entry for the component, or add it to
                // the existing component entry's matches
                match other_solution.matches.iter_mut()
                    .find(|v| v.target_id == component_id)
                {
                    Some(other_match) => {
                        other_match.match_indices.push(solution_index);
                    },
                    None => {
                        other_solution.matches.push(ComponentMatches{
                            target_id: component_id,
                            target_index: component_index,
                            match_indices: vec![solution_index],
                        })
                    }
                }
            }

            let cur_component = &mut self.local[component_index];
            let cur_solution = &mut cur_component.solutions[solution_index];

            match cur_solution.matches.iter_mut()
                .find(|v| v.target_id == new_component_match.target_id)
            {
                Some(other_match) => {
                    // Already have an entry
                    debug_assert_eq!(other_match.target_index, new_component_match.target_index);
                    other_match.match_indices.extend(&new_component_match.match_indices);
                },
                None => {
                    // Create a new entry
                    cur_solution.matches.push(new_component_match);
                }
            }
        }

        return self.check_new_solution(component_index, solution_index);
    }

    /// Checks if, starting at the provided local solution, a global solution
    /// can be formed.
    fn check_new_solution(&self, component_index: usize, solution_index: usize) -> Option<GlobalSolution> {
        if !self.can_have_solution() {
            return None;
        }

        // By now we're certain that all peers are present. So once our
        // backtracking solution stack is as long as the number of components,
        // then we have found a global solution.
        let mut check_stack = Vec::new();
        let mut check_from = 0;
        check_stack.push((component_index, solution_index));
        'checking_loop: while check_from < check_stack.len() {
            // Prepare for next iteration
            let new_check_from = check_stack.len();

            // Go through all entries on the checking stack. Each entry
            // corresponds to a component's solution. We check that one against
            // previously added ones on the stack, and if they're not already
            // added we push them onto the check stack.
            for check_idx in check_from..new_check_from {
                // Take the current solution
                let (component_index, solution_index) = check_stack[check_idx];
                debug_assert!(!self.local[component_index].solutions.is_empty());
                let cur_solution = &self.local[component_index].solutions[solution_index];

                // Go through the matches and check if they're on the stack or
                // should be added to the stack.
                for cur_match in &cur_solution.matches {
                    let mut is_already_on_stack = false;
                    let mut has_same_solution = false;
                    for existing_check_idx in 0..check_from {
                        let (existing_component_index, existing_solution_index) = check_stack[existing_check_idx];
                        if existing_component_index == cur_match.target_index {
                            // Already lives on the stack, so the match MUST
                            // contain the same solution index if the checked
                            // local solution is agreeable with the (partially
                            // determined) global solution.
                            is_already_on_stack = true;
                            if cur_match.match_indices.contains(&existing_solution_index) {
                                has_same_solution = true;
                                break;
                            }
                        }
                    }

                    if is_already_on_stack {
                        if !has_same_solution {
                            // We have an inconsistency, so we need to go back
                            // in our stack, and try the next solution
                            let (last_component_index, last_solution_index) = check_stack[check_from];
                            check_stack.truncate(check_from);
                            if check_stack.is_empty() {
                                // The starting point does not yield a valid
                                // solution
                                return None;
                            }

                            // Try the next one
                            let last_component = &self.local[last_component_index];
                            let new_solution_index = last_solution_index + 1;
                            if new_solution_index >= last_component.solutions.len() {
                                // No more things to try, again: no valid
                                // solution
                                return None;
                            }

                            check_stack.push((last_component_index, new_solution_index));
                            continue 'checking_loop;
                        } // else: we're fine, the solution is agreeable
                    } else {
                        check_stack.push((cur_match.target_index, 0))
                    }
                }
            }

            check_from = new_check_from;
        }

        // Because of our earlier checking if we can have a solution at
        // all (all components have their peers), and the exit condition of the
        // while loop: if we're here, then we have a global solution
        debug_assert_eq!(check_stack.len(), self.local.len());
        let mut final_branches = Vec::with_capacity(check_stack.len());
        for (component_index, solution_index) in check_stack.iter().copied() {
            let component = &self.local[component_index];
            let solution = &component.solutions[solution_index];
            final_branches.push((component.component, solution.final_branch_id));
        }

        // Just debugging here, TODO: @remove
        let mut total_num_channels = 0;
        for (component_index, _) in check_stack.iter().copied() {
            let component = &self.local[component_index];
            total_num_channels += component.solutions[0].channel_mapping.len();
        }

        total_num_channels /= 2;
        let mut final_mapping = Vec::with_capacity(total_num_channels);
        let mut total_num_checked = 0;

        for (component_index, solution_index) in check_stack.iter().copied() {
            let component = &self.local[component_index];
            let solution = &component.solutions[solution_index];

            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
                    Some((_, encountered_branch_id)) => {
                        debug_assert_eq!(*encountered_branch_id, branch_id);
                        total_num_checked += 1;
                    },
                    None => {
                        final_mapping.push((channel_id, branch_id));
                    }
                }
            }
        }

        debug_assert_eq!(total_num_checked, total_num_channels);

        return Some(GlobalSolution{
            component_branches: final_branches,
            channel_mapping: final_mapping,
        });
    }

    /// Simple test if a solution is at all possible. If this returns true it
    /// does not mean there actually is a solution.
    fn can_have_solution(&self) -> bool {
        for component in &self.local {
            if !component.all_peers_present {
                return false;
            }
        }

        return true;
    }

    /// Turns the entire (partially resolved) global solution back into local
    /// solutions to ship to another component.
    // TODO: Don't do this, kind of wasteful since a lot of processing has
    //  already been performed.
    fn drain(&mut self) -> Vec<LocalSolution> {
        let mut reserve_len = 0;
        for component in &self.local {
            reserve_len += component.solutions.len();
        }

        let mut solutions = Vec::with_capacity(reserve_len);
        for component in self.local.drain(..) {
            for solution in component.solutions {
                solutions.push(LocalSolution{
                    component: component.component,
                    final_branch_id: solution.final_branch_id,
                    port_mapping: solution.channel_mapping,
                });
            }
        }

        return solutions;
    }

    fn clear(&mut self) {
        self.local.clear();
    }
}

// -----------------------------------------------------------------------------
// Generic Helpers
// -----------------------------------------------------------------------------

/// Recursively goes through the value group, attempting to find ports.
/// Duplicates will only be added once.
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
    // Helper to check a value for a port and recurse if needed.
    use crate::protocol::eval::Value;

    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
        match value {
            Value::Input(port_id) | Value::Output(port_id) => {
                // This is an actual port
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
                for prev_port in ports.iter() {
                    if *prev_port == cur_port {
                        // Already added
                        return;
                    }
                }

                ports.push(cur_port);
            },
            Value::Array(heap_pos) |
            Value::Message(heap_pos) |
            Value::String(heap_pos) |
            Value::Struct(heap_pos) |
            Value::Union(_, heap_pos) => {
                // Reference to some dynamic thing which might contain ports,
                // so recurse
                let heap_region = &group.regions[*heap_pos as usize];
                for embedded_value in heap_region {
                    find_port_in_value(group, embedded_value, ports);
                }
            },
            _ => {}, // values we don't care about
        }
    }

    // Clear the ports, then scan all the available values
    ports.clear();
    for value in &value_group.values {
        find_port_in_value(value_group, value, ports);
    }
}