Changeset - 8fa5df1e1626
[Not reviewed]
0 0 4
mh - 3 years ago 2022-05-18 14:13:58
contact@maxhenger.nl
Add more examples
4 files changed with 272 insertions and 0 deletions:
0 comments (0 inline, 0 general)
testdata/examples/01_reworked_consensus_select_01.pdl
Show inline comments
 
new file 100644
 
// Previously the following example caused the inadvertent synchronization of
 
// all participating components
 

	
 
func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 {
 
    u64 sum = 0;
 
    u64 index = min_val;
 
    while (index <= max_val) {
 
        sum += index;
 
        index += 1;
 
    }
 

	
 
    return sum;
 
}
 

	
 
comp data_producer(out<u64> tx, u64 min_val, u64 max_val, string send_text) {
 
    while (true) {
 
        sync {
 
            auto value = lots_of_unneccesary_work(min_val, max_val);
 
            print(send_text);
 
            put(tx, value);
 
        }
 
    }
 
}
 

	
 
comp data_receiver_v1(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        while (peer_index < num_peers) {
 
            sync {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V1)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// The reason was that a synchronous interaction checked *all* ports for a valid
 
// interaction. So for the `round_robin_receiver` we have that it communicates
 
// with one peer per round, but it still requires the other peers to agree that
 
// they didn't send anything at all! Note that this already implies that all
 
// running components need to synchronize. We could fix this by writing:
 

	
 
comp data_receiver_v2(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        sync {
 
            while (peer_index < num_peers) {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V2)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// But this is not the intended behaviour. We want the producer components to
 
// be able to run independently of one another. This requires a change in the
 
// semantics of the language! We no longer have that each peer is automatically
 
// dragged into the synchronous round. Instead, once the first message of the
 
// peer is received through a `get` call, will we join each other's synchronous
 
// rounds.
 
//
 
// With such a change to the runtime, we now have that the first version (
 
// written above) produces the intended behaviour: the consumer accepts one
 
// value and synchronizes with its sender. Then goes to the next round and 4
 
// synchronizes with the next sender.
 
//
 
// But what we would really like to do is to synchronize with any of the peers
 
// that happens to have its work ready for consumption. And so the 'select'
 
// statement is introduced into the language. This statement can be used to
 
// describe a set of possible behaviours we could execute. Each behaviour will
 
// have an associated set of ports. When those associated set of ports have a
 
// message ready to be read, then the corresponding behaviour will execute. So
 
// to complete the example above, we have:
 

	
 
comp data_receiver_v3(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    u32 received_from_a = 0;
 
    u32 received_from_b_or_c = 0;
 
    u32 received_from_a_or_c = 0;
 
    u64 sum_received_from_c = 0;
 

	
 
    while (counter < num_rounds*3) {
 
        sync {
 
            select {
 
                auto value = get(rx_a) -> {
 
                    received_from_a += 1;
 
                    received_from_a_or_c += 1;
 
                }
 
                auto value = get(rx_b) -> {
 
                    received_from_b_or_c += 1;
 
                }
 
                auto value = get(rx_c) -> {
 
                    received_from_a_or_c += 1;
 
                    received_from_b_or_c += 1;
 
                    sum_received_from_c += value;
 
                }
 
            }
 
            print("received message (V3)");
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
comp main() {
 
    u32 version = 3;
 
    u32 num_rounds = 3;
 

	
 
    channel tx_a -> rx_a;
 
    channel tx_b -> rx_b;
 
    channel tx_c -> rx_c;
 
    new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A");
 
    new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B");
 
    new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C");
 

	
 
    if (version == 1) {
 
        new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 2) {
 
        new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 3) {
 
        new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds);
 
    } else {
 
        print("ERROR: invalid version in source");
 
    }
 
}
 
\ No newline at end of file
testdata/examples/02_error_handling.pdl
Show inline comments
 
new file 100644
 
// Although in an unstable state, there is an initial implementation for error
 
// handling. Roughly speaking: if a component has failed then it cannot complete
 
// any current or future synchronous rounds anymore. Hence, apart from some edge
 
// cases, any received message by a peer should cause a failure at that peer as
 
// well. We may have a look at the various places where a component with respect
 
// to a peer that is receiving its messages
 

	
 
enum ErrorLocation {
 
    BeforeSync,
 
    DuringSyncBeforeFirstInteraction,
 
    DuringSyncBeforeSecondInteraction,
 
    DuringSyncAfterInteractions,
 
    AfterSync,
 
}
 

	
 
func error_location_to_string(ErrorLocation loc) -> string {
 
    if (let ErrorLocation::BeforeSync = loc) {
 
        return "before sync";
 
    } else if (let ErrorLocation::DuringSyncBeforeFirstInteraction = loc) {
 
        return "during sync before first interaction";
 
    } else if (let ErrorLocation::DuringSyncBeforeSecondInteraction = loc) {
 
        return "during sync before second interaction";
 
    } else if (let ErrorLocation::DuringSyncAfterInteractions = loc) {
 
        return "during sync after interactions";
 
    } else { return "after sync"; }
 
}
 

	
 
func crash() -> u8 {
 
    return {}[0]; // access index 1 of an empty array
 
}
 

	
 
comp sender_and_crasher(out<u32> value, ErrorLocation loc) {
 
    print("sender: will crash " @ error_location_to_string(loc));
 
    if (loc == ErrorLocation::BeforeSync) { crash(); }
 
    sync {
 
        if (loc == ErrorLocation::DuringSyncBeforeFirstInteraction) { crash(); }
 
        print("sender: sending first value");
 
        put(value, 0);
 
        if (loc == ErrorLocation::DuringSyncBeforeSecondInteraction) { crash(); }
 
        print("sender: sending second value");
 
        put(value, 1);
 
        if (loc == ErrorLocation::DuringSyncAfterInteractions) { crash(); }
 
    }
 
    if (loc == ErrorLocation::AfterSync) { crash(); }
 
}
 

	
 
comp receiver(in<u32> value) {
 
    sync {
 
        auto a = get(value);
 
        auto b = get(value);
 
    }
 
}
 

	
 
// Note that when we run the example with the error location before sync, or
 
// during sync, that the receiver always crashes. However the location where it
 
// will crash is somewhat random! Due to the asynchronous nature of the runtime
 
// a sender of messages will always just `put` the value onto the port and
 
// continue execution. So even though the sender component might already be done
 
// with its sync round, the receiver officially still has to receive its first
 
// message. In any case, a neat error message should be displayed in the
 
// console.
 
//
 
// Note especially, given the asynchronous nature of the runtime, that the
 
// receiver should figure out when the peer component has crashed, but it can
 
// still finish the current synchronous round. This might happen if the peer
 
// component crashes *just* after the synchronous round. There may be a case
 
// where the peer receives the information that the peer crashed *before* it
 
// receives the information that the synchronous round has succeeded.
 

	
 
comp main() {
 
    channel tx -> rx;
 

	
 
    new sender_and_crasher(tx, ErrorLocation::AfterSync);
 
    new receiver(rx);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_01.pdl
Show inline comments
 
new file 100644
 
// Since this release transmitting ports is possible. This means that we can
 
// send ports through ports. In fact, we can send ports that may send ports that
 
// may send ports. But don't be fooled by the apparent complexity. The inner
 
// type `T` of a port like `in<T>` simply states that that is the message type.
 
// Should the type `T` contain one or more ports, then we kick off a bit of code
 
// that takes care of the transfer of the port. Should the port inside of `T`
 
// itself, after being received, send a port, then we simply kick off that same
 
// procedure again.
 
//
 
// In the simplest case, we have someone transmitting the receiving end of a
 
// channel to another component, which then uses that receiving end to receive a
 
// value.
 

	
 
comp port_sender(out<in<u32>> tx, in<u32> to_transmit) {
 
    sync put(tx, to_transmit);
 
}
 

	
 
comp port_receiver_and_value_getter(in<in<u32>> rx, u32 expected_value) {
 
    u32 got_value = 0;
 
    sync {
 
        auto port = get(rx);
 
        got_value = get(port);
 
    }
 
    if (expected_value == got_value) {
 
        print("got the expected value :)");
 
    } else {
 
        print("got a different value :(");
 
    }
 
}
 

	
 
comp value_sender(out<u32> tx, u32 to_send) {
 
    sync put(tx, to_send);
 
}
 

	
 
comp main() {
 
    u32 value = 1337_2392;
 

	
 
    channel port_tx -> port_rx;
 
    channel value_tx -> value_rx;
 
    new port_sender(port_tx, value_rx);
 
    new port_receiver_and_value_getter(port_rx, value);
 
    new value_sender(value_tx, value);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_02.pdl
Show inline comments
 
new file 100644
 
// Ofcourse we may do something a little more complicated than this. Suppose
 
// that we don't just send one port, but send a series of ports. i.e.
 

	
 
union Option<T> {
 
    Some(T),
 
    None,
 
}
 

	
 
comp port_sender(out<Option<in<u32>>> tx, in<u32>[] to_transmit) {
 
    auto num_ports = length(to_transmit);
 
    auto index = 0;
 
    while (index < num_ports) {
 

	
 
        index += 1;
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)