diff --git a/testdata/examples/03_transmitting_ports_02.pdl b/testdata/examples/03_transmitting_ports_02.pdl index 50b34457d271b129f700383424ce18eadef8f183..cf6180be41376a54fbe37cb0ab63d9ac842d2227 100644 --- a/testdata/examples/03_transmitting_ports_02.pdl +++ b/testdata/examples/03_transmitting_ports_02.pdl @@ -1,16 +1,157 @@ // Ofcourse we may do something a little more complicated than this. Suppose -// that we don't just send one port, but send a series of ports. i.e. +// that we don't just send one port, but send a series of ports. i.e. we use +// an `Option` union type, to ... union Option { Some(T), None, } -comp port_sender(out>> tx, in[] to_transmit) { +// ... turn an array of ports that we're going to transmit into a series of +// messages containing ports, each sent to a specific component. + +comp port_sender(out>>[] txs, in[] to_transmit) { + auto num_peers = length(txs); auto num_ports = length(to_transmit); - auto index = 0; - while (index < num_ports) { + auto num_per_peer = num_ports / num_peers; + auto num_remaining = num_ports - (num_per_peer * num_peers); + + auto peer_index = 0; + auto port_index = 0; + while (peer_index < num_peers) { + auto peer_port = txs[peer_index]; + auto counter = 0; + + // Distribute part of the ports to one of the peers. + sync { + // Sending the main batch of ports for the peer + while (counter < num_per_peer) { + put(peer_port, Option::Some(to_transmit[port_index])); + port_index += 1; + counter += 1; + } + + // Sending the remainder of ports, one per peer until they're gone + if (num_remaining > 0) { + put(peer_port, Option::Some(to_transmit[port_index])); + port_index += 1; + num_remaining -= 1; + } + + // Finish the custom protocol by sending nothing, which indicates to + // the peer that it has received all the ports we have to hand out. + put(peer_port, Option::None); + } + + peer_index += 1; + } +} + +// And here we have the component which will receive on that port. We can design +// the synchronous regions any we want. In this case when we receive ports we +// just synchronize `port_sender`, but the moment we receive messages we +// synchronize with everyone. + +comp port_receiver(in>> port_rxs, out sum_tx) { + // Receive all ports + auto value_rxs = {}; + + sync { + while (true) { + auto maybe_port = get(port_rxs); + if (let Option::Some(certainly_a_port) = maybe_port) { + value_rxs @= { certainly_a_port }; + } else { + break; + } + } + } + + // Receive all values + auto received_sum = 0; + + sync { + auto port_index = 0; + auto num_ports = length(value_rxs); + while (port_index < num_ports) { + auto value = get(value_rxs[port_index]); + received_sum += value; + port_index += 1; + } + } + + // And send the sum + sync put(sum_tx, received_sum); +} + +// Now we need something to send the values, we'll make something incredibly +// simple. Namely: + +comp value_sender(out tx, u32 value_to_send) { + sync put(tx, value_to_send); +} + +comp sum_collector(in[] partial_sum_rx, out total_sum_tx) { + auto sum = 0; + auto index = 0; + while (index < length(partial_sum_rx)) { + sync sum += get(partial_sum_rx[index]); index += 1; } + + sync put(total_sum_tx, sum); +} + +// And we need the component to set this entire system of components up. So we +// write the following entry point. + +comp main() { + auto num_value_ports = 32; + auto num_receivers = 3; + + // Construct the senders of values + auto value_port_index = 1; + auto value_rx_ports = {}; + while (value_port_index <= num_value_ports) { + channel value_tx -> value_rx; + new value_sender(value_tx, value_port_index); + value_rx_ports @= { value_rx }; + value_port_index += 1; + } + + // Construct the components that will receive groups of value-receiving + // ports + auto receiver_index = 0; + auto sum_combine_rx_ports = {}; + auto port_tx_ports = {}; + + while (receiver_index < num_receivers) { + channel sum_tx -> sum_rx; + channel port_tx -> port_rx; + new port_receiver(port_rx, sum_tx); + + sum_combine_rx_ports @= { sum_rx }; + port_tx_ports @= { port_tx }; + receiver_index += 1; + } + + // Construct the component that redistributes the total number of input + // ports. + new port_sender(port_tx_ports, value_rx_ports); + + // Construct the component that computes the sum of all sent values + channel total_value_tx -> total_value_rx; + new sum_collector(sum_combine_rx_ports, total_value_tx); + + auto expected = num_value_ports * (num_value_ports + 1) / 2; + auto received = 0; + + sync received = get(total_value_rx); + + if (expected == received) { + print("got the expected value!"); + } else { + print("got something entirely different"); + } } \ No newline at end of file