// Previously the following example caused the inadvertent synchronization of // all participating components func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 { u64 sum = 0; u64 index = min_val; while (index <= max_val) { sum += index; index += 1; } return sum; } comp data_producer(out tx, u64 min_val, u64 max_val, string send_text) { while (true) { sync { auto value = lots_of_unneccesary_work(min_val, max_val); print(send_text); put(tx, value); } } } comp data_receiver_v1(in rx_a, in rx_b, in rx_c, u32 num_rounds) { u32 counter = 0; auto rxs = { rx_a, rx_b, rx_c }; while (counter < num_rounds) { auto num_peers = length(rxs); auto peer_index = 0; while (peer_index < num_peers) { sync { auto result = get(rxs[peer_index]); print("received message (V1)"); peer_index += 1; } } counter += 1; } } // The reason was that a synchronous interaction checked *all* ports for a valid // interaction. So for the `round_robin_receiver` we have that it communicates // with one peer per round, but it still requires the other peers to agree that // they didn't send anything at all! Note that this already implies that all // running components need to synchronize. We could fix this by writing: comp data_receiver_v2(in rx_a, in rx_b, in rx_c, u32 num_rounds) { u32 counter = 0; auto rxs = { rx_a, rx_b, rx_c }; while (counter < num_rounds) { auto num_peers = length(rxs); auto peer_index = 0; sync { while (peer_index < num_peers) { auto result = get(rxs[peer_index]); print("received message (V2)"); peer_index += 1; } } counter += 1; } } // But this is not the intended behaviour. We want the producer components to // be able to run independently of one another. This requires a change in the // semantics of the language! We no longer have that each peer is automatically // dragged into the synchronous round. Instead, once the first message of the // peer is received through a `get` call, will we join each other's synchronous // rounds. // // With such a change to the runtime, we now have that the first version ( // written above) produces the intended behaviour: the consumer accepts one // value and synchronizes with its sender. Then goes to the next round and 4 // synchronizes with the next sender. // // But what we would really like to do is to synchronize with any of the peers // that happens to have its work ready for consumption. And so the 'select' // statement is introduced into the language. This statement can be used to // describe a set of possible behaviours we could execute. Each behaviour will // have an associated set of ports. When those associated set of ports have a // message ready to be read, then the corresponding behaviour will execute. So // to complete the example above, we have: comp data_receiver_v3(in rx_a, in rx_b, in rx_c, u32 num_rounds) { u32 counter = 0; auto rxs = { rx_a, rx_b, rx_c }; u32 received_from_a = 0; u32 received_from_b_or_c = 0; u32 received_from_a_or_c = 0; u64 sum_received_from_c = 0; while (counter < num_rounds*3) { sync { select { auto value = get(rx_a) -> { received_from_a += 1; received_from_a_or_c += 1; } auto value = get(rx_b) -> { received_from_b_or_c += 1; } auto value = get(rx_c) -> { received_from_a_or_c += 1; received_from_b_or_c += 1; sum_received_from_c += value; } } print("received message (V3)"); } counter += 1; } } comp main() { u32 version = 3; u32 num_rounds = 3; channel tx_a -> rx_a; channel tx_b -> rx_b; channel tx_c -> rx_c; new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A"); new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B"); new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C"); if (version == 1) { new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds); } else if (version == 2) { new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds); } else if (version == 3) { new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds); } else { print("ERROR: invalid version in source"); } }