// Ofcourse we may do something a little more complicated than this. Suppose // that we don't just send one port, but send a series of ports. i.e. we use // an `Option` union type, to ... union Option { Some(T), None, } // ... turn an array of ports that we're going to transmit into a series of // messages containing ports, each sent to a specific component. comp port_sender(out>>[] txs, in[] to_transmit) { auto num_peers = length(txs); auto num_ports = length(to_transmit); auto num_per_peer = num_ports / num_peers; auto num_remaining = num_ports - (num_per_peer * num_peers); auto peer_index = 0; auto port_index = 0; while (peer_index < num_peers) { auto peer_port = txs[peer_index]; auto counter = 0; // Distribute part of the ports to one of the peers. sync { // Sending the main batch of ports for the peer while (counter < num_per_peer) { put(peer_port, Option::Some(to_transmit[port_index])); port_index += 1; counter += 1; } // Sending the remainder of ports, one per peer until they're gone if (num_remaining > 0) { put(peer_port, Option::Some(to_transmit[port_index])); port_index += 1; num_remaining -= 1; } // Finish the custom protocol by sending nothing, which indicates to // the peer that it has received all the ports we have to hand out. put(peer_port, Option::None); } peer_index += 1; } } // And here we have the component which will receive on that port. We can design // the synchronous regions any we want. In this case when we receive ports we // just synchronize `port_sender`, but the moment we receive messages we // synchronize with everyone. comp port_receiver(in>> port_rxs, out sum_tx) { // Receive all ports auto value_rxs = {}; sync { while (true) { auto maybe_port = get(port_rxs); if (let Option::Some(certainly_a_port) = maybe_port) { value_rxs @= { certainly_a_port }; } else { break; } } } // Receive all values auto received_sum = 0; sync { auto port_index = 0; auto num_ports = length(value_rxs); while (port_index < num_ports) { auto value = get(value_rxs[port_index]); received_sum += value; port_index += 1; } } // And send the sum sync put(sum_tx, received_sum); } // Now we need something to send the values, we'll make something incredibly // simple. Namely: comp value_sender(out tx, u32 value_to_send) { sync put(tx, value_to_send); } comp sum_collector(in[] partial_sum_rx, out total_sum_tx) { auto sum = 0; auto index = 0; while (index < length(partial_sum_rx)) { sync sum += get(partial_sum_rx[index]); index += 1; } sync put(total_sum_tx, sum); } // And we need the component to set this entire system of components up. So we // write the following entry point. comp main() { auto num_value_ports = 32; auto num_receivers = 3; // Construct the senders of values auto value_port_index = 1; auto value_rx_ports = {}; while (value_port_index <= num_value_ports) { channel value_tx -> value_rx; new value_sender(value_tx, value_port_index); value_rx_ports @= { value_rx }; value_port_index += 1; } // Construct the components that will receive groups of value-receiving // ports auto receiver_index = 0; auto sum_combine_rx_ports = {}; auto port_tx_ports = {}; while (receiver_index < num_receivers) { channel sum_tx -> sum_rx; channel port_tx -> port_rx; new port_receiver(port_rx, sum_tx); sum_combine_rx_ports @= { sum_rx }; port_tx_ports @= { port_tx }; receiver_index += 1; } // Construct the component that redistributes the total number of input // ports. new port_sender(port_tx_ports, value_rx_ports); // Construct the component that computes the sum of all sent values channel total_value_tx -> total_value_rx; new sum_collector(sum_combine_rx_ports, total_value_tx); auto expected = num_value_ports * (num_value_ports + 1) / 2; auto received = 0; sync received = get(total_value_rx); if (expected == received) { print("got the expected value!"); } else { print("got something entirely different"); } }