Files
@ 97217a7b2d18
Branch filter:
Location: CSY/reowolf/testdata/examples/01_reworked_consensus_select_01.pdl
97217a7b2d18
4.6 KiB
text/plain
feat: examples
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | // Previously the following example caused the inadvertent synchronization of
// all participating components
func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 {
u64 sum = 0;
u64 index = min_val;
while (index <= max_val) {
sum += index;
index += 1;
}
return sum;
}
comp data_producer(out<u64> tx, u64 min_val, u64 max_val, string send_text) {
while (true) {
sync {
auto value = lots_of_unneccesary_work(min_val, max_val);
print(send_text);
put(tx, value);
}
}
}
comp data_receiver_v1(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
u32 counter = 0;
auto rxs = { rx_a, rx_b, rx_c };
while (counter < num_rounds) {
auto num_peers = length(rxs);
auto peer_index = 0;
while (peer_index < num_peers) {
sync {
auto result = get(rxs[peer_index]);
print("received message (V1)");
peer_index += 1;
}
}
counter += 1;
}
}
// The reason was that a synchronous interaction checked *all* ports for a valid
// interaction. So for the `round_robin_receiver` we have that it communicates
// with one peer per round, but it still requires the other peers to agree that
// they didn't send anything at all! Note that this already implies that all
// running components need to synchronize. We could fix this by writing:
comp data_receiver_v2(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
u32 counter = 0;
auto rxs = { rx_a, rx_b, rx_c };
while (counter < num_rounds) {
auto num_peers = length(rxs);
auto peer_index = 0;
sync {
while (peer_index < num_peers) {
auto result = get(rxs[peer_index]);
print("received message (V2)");
peer_index += 1;
}
}
counter += 1;
}
}
// But this is not the intended behaviour. We want the producer components to
// be able to run independently of one another. This requires a change in the
// semantics of the language! We no longer have that each peer is automatically
// dragged into the synchronous round. Instead, once the first message of the
// peer is received through a `get` call, will we join each other's synchronous
// rounds.
//
// With such a change to the runtime, we now have that the first version (
// written above) produces the intended behaviour: the consumer accepts one
// value and synchronizes with its sender. Then goes to the next round and 4
// synchronizes with the next sender.
//
// But what we would really like to do is to synchronize with any of the peers
// that happens to have its work ready for consumption. And so the 'select'
// statement is introduced into the language. This statement can be used to
// describe a set of possible behaviours we could execute. Each behaviour will
// have an associated set of ports. When those associated set of ports have a
// message ready to be read, then the corresponding behaviour will execute. So
// to complete the example above, we have:
comp data_receiver_v3(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
u32 counter = 0;
auto rxs = { rx_a, rx_b, rx_c };
u32 received_from_a = 0;
u32 received_from_b_or_c = 0;
u32 received_from_a_or_c = 0;
u64 sum_received_from_c = 0;
while (counter < num_rounds*3) {
sync {
select {
auto value = get(rx_a) -> {
received_from_a += 1;
received_from_a_or_c += 1;
}
auto value = get(rx_b) -> {
received_from_b_or_c += 1;
}
auto value = get(rx_c) -> {
received_from_a_or_c += 1;
received_from_b_or_c += 1;
sum_received_from_c += value;
}
}
print("received message (V3)");
}
counter += 1;
}
}
comp main() {
u32 version = 3;
u32 num_rounds = 3;
channel tx_a -> rx_a;
channel tx_b -> rx_b;
channel tx_c -> rx_c;
new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A");
new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B");
new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C");
if (version == 1) {
new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds);
} else if (version == 2) {
new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds);
} else if (version == 3) {
new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds);
} else {
print("ERROR: invalid version in source");
}
}
|