Files @ 88611f9fd179
Branch filter:

Location: CSY/reowolf/testdata/examples/01_reworked_consensus_select_01.pdl

Max Henger
Merge branch 'feat-examples' into 'master'

feat: examples

See merge request nl-cwi-csy/reowolf!12
// Previously the following example caused the inadvertent synchronization of
// all participating components

func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 {
    u64 sum = 0;
    u64 index = min_val;
    while (index <= max_val) {
        sum += index;
        index += 1;
    }

    return sum;
}

comp data_producer(out<u64> tx, u64 min_val, u64 max_val, string send_text) {
    while (true) {
        sync {
            auto value = lots_of_unneccesary_work(min_val, max_val);
            print(send_text);
            put(tx, value);
        }
    }
}

comp data_receiver_v1(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
    u32 counter = 0;
    auto rxs = { rx_a, rx_b, rx_c };

    while (counter < num_rounds) {
        auto num_peers = length(rxs);
        auto peer_index = 0;
        while (peer_index < num_peers) {
            sync {
                auto result = get(rxs[peer_index]);
                print("received message (V1)");
                peer_index += 1;
            }
        }
        counter += 1;
    }
}

// The reason was that a synchronous interaction checked *all* ports for a valid
// interaction. So for the `round_robin_receiver` we have that it communicates
// with one peer per round, but it still requires the other peers to agree that
// they didn't send anything at all! Note that this already implies that all
// running components need to synchronize. We could fix this by writing:

comp data_receiver_v2(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
    u32 counter = 0;
    auto rxs = { rx_a, rx_b, rx_c };

    while (counter < num_rounds) {
        auto num_peers = length(rxs);
        auto peer_index = 0;
        sync {
            while (peer_index < num_peers) {
                auto result = get(rxs[peer_index]);
                print("received message (V2)");
                peer_index += 1;
            }
        }
        counter += 1;
    }
}

// But this is not the intended behaviour. We want the producer components to
// be able to run independently of one another. This requires a change in the
// semantics of the language! We no longer have that each peer is automatically
// dragged into the synchronous round. Instead, once the first message of the
// peer is received through a `get` call, will we join each other's synchronous
// rounds.
//
// With such a change to the runtime, we now have that the first version (
// written above) produces the intended behaviour: the consumer accepts one
// value and synchronizes with its sender. Then goes to the next round and 4
// synchronizes with the next sender.
//
// But what we would really like to do is to synchronize with any of the peers
// that happens to have its work ready for consumption. And so the 'select'
// statement is introduced into the language. This statement can be used to
// describe a set of possible behaviours we could execute. Each behaviour will
// have an associated set of ports. When those associated set of ports have a
// message ready to be read, then the corresponding behaviour will execute. So
// to complete the example above, we have:

comp data_receiver_v3(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
    u32 counter = 0;
    auto rxs = { rx_a, rx_b, rx_c };

    u32 received_from_a = 0;
    u32 received_from_b_or_c = 0;
    u32 received_from_a_or_c = 0;
    u64 sum_received_from_c = 0;

    while (counter < num_rounds*3) {
        sync {
            select {
                auto value = get(rx_a) -> {
                    received_from_a += 1;
                    received_from_a_or_c += 1;
                }
                auto value = get(rx_b) -> {
                    received_from_b_or_c += 1;
                }
                auto value = get(rx_c) -> {
                    received_from_a_or_c += 1;
                    received_from_b_or_c += 1;
                    sum_received_from_c += value;
                }
            }
            print("received message (V3)");
        }
        counter += 1;
    }
}

comp main() {
    u32 version = 3;
    u32 num_rounds = 3;

    channel tx_a -> rx_a;
    channel tx_b -> rx_b;
    channel tx_c -> rx_c;
    new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A");
    new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B");
    new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C");

    if (version == 1) {
        new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds);
    } else if (version == 2) {
        new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds);
    } else if (version == 3) {
        new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds);
    } else {
        print("ERROR: invalid version in source");
    }
}