Files
@ 97217a7b2d18
Branch filter:
Location: CSY/reowolf/testdata/examples/03_transmitting_ports_02.pdl
97217a7b2d18
4.6 KiB
text/plain
feat: examples
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | // Ofcourse we may do something a little more complicated than this. Suppose
// that we don't just send one port, but send a series of ports. i.e. we use
// an `Option` union type, to ...
union Option<T> {
Some(T),
None,
}
// ... turn an array of ports that we're going to transmit into a series of
// messages containing ports, each sent to a specific component.
comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
auto num_peers = length(txs);
auto num_ports = length(to_transmit);
auto num_per_peer = num_ports / num_peers;
auto num_remaining = num_ports - (num_per_peer * num_peers);
auto peer_index = 0;
auto port_index = 0;
while (peer_index < num_peers) {
auto peer_port = txs[peer_index];
auto counter = 0;
// Distribute part of the ports to one of the peers.
sync {
// Sending the main batch of ports for the peer
while (counter < num_per_peer) {
put(peer_port, Option::Some(to_transmit[port_index]));
port_index += 1;
counter += 1;
}
// Sending the remainder of ports, one per peer until they're gone
if (num_remaining > 0) {
put(peer_port, Option::Some(to_transmit[port_index]));
port_index += 1;
num_remaining -= 1;
}
// Finish the custom protocol by sending nothing, which indicates to
// the peer that it has received all the ports we have to hand out.
put(peer_port, Option::None);
}
peer_index += 1;
}
}
// And here we have the component which will receive on that port. We can design
// the synchronous regions any we want. In this case when we receive ports we
// just synchronize `port_sender`, but the moment we receive messages we
// synchronize with everyone.
comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
// Receive all ports
auto value_rxs = {};
sync {
while (true) {
auto maybe_port = get(port_rxs);
if (let Option::Some(certainly_a_port) = maybe_port) {
value_rxs @= { certainly_a_port };
} else {
break;
}
}
}
// Receive all values
auto received_sum = 0;
sync {
auto port_index = 0;
auto num_ports = length(value_rxs);
while (port_index < num_ports) {
auto value = get(value_rxs[port_index]);
received_sum += value;
port_index += 1;
}
}
// And send the sum
sync put(sum_tx, received_sum);
}
// Now we need something to send the values, we'll make something incredibly
// simple. Namely:
comp value_sender(out<u32> tx, u32 value_to_send) {
sync put(tx, value_to_send);
}
comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
auto sum = 0;
auto index = 0;
while (index < length(partial_sum_rx)) {
sync sum += get(partial_sum_rx[index]);
index += 1;
}
sync put(total_sum_tx, sum);
}
// And we need the component to set this entire system of components up. So we
// write the following entry point.
comp main() {
auto num_value_ports = 32;
auto num_receivers = 3;
// Construct the senders of values
auto value_port_index = 1;
auto value_rx_ports = {};
while (value_port_index <= num_value_ports) {
channel value_tx -> value_rx;
new value_sender(value_tx, value_port_index);
value_rx_ports @= { value_rx };
value_port_index += 1;
}
// Construct the components that will receive groups of value-receiving
// ports
auto receiver_index = 0;
auto sum_combine_rx_ports = {};
auto port_tx_ports = {};
while (receiver_index < num_receivers) {
channel sum_tx -> sum_rx;
channel port_tx -> port_rx;
new port_receiver(port_rx, sum_tx);
sum_combine_rx_ports @= { sum_rx };
port_tx_ports @= { port_tx };
receiver_index += 1;
}
// Construct the component that redistributes the total number of input
// ports.
new port_sender(port_tx_ports, value_rx_ports);
// Construct the component that computes the sum of all sent values
channel total_value_tx -> total_value_rx;
new sum_collector(sum_combine_rx_ports, total_value_tx);
auto expected = num_value_ports * (num_value_ports + 1) / 2;
auto received = 0;
sync received = get(total_value_rx);
if (expected == received) {
print("got the expected value!");
} else {
print("got something entirely different");
}
}
|