Files @ 63a4a437bc0a
Branch filter:

Location: CSY/reowolf/testdata/examples/03_transmitting_ports_02.pdl

MH
Add more complicated port sending example
// Ofcourse we may do something a little more complicated than this. Suppose
// that we don't just send one port, but send a series of ports. i.e. we use
// an `Option` union type, to ...

union Option<T> {
    Some(T),
    None,
}

// ... turn an array of ports that we're going to transmit into a series of
// messages containing ports, each sent to a specific component.

comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
    auto num_peers = length(txs);
    auto num_ports = length(to_transmit);

    auto num_per_peer = num_ports / num_peers;
    auto num_remaining = num_ports - (num_per_peer * num_peers);

    auto peer_index = 0;
    auto port_index = 0;
    while (peer_index < num_peers) {
        auto peer_port = txs[peer_index];
        auto counter = 0;

        // Distribute part of the ports to one of the peers.
        sync {
            // Sending the main batch of ports for the peer
            while (counter < num_per_peer) {
                put(peer_port, Option::Some(to_transmit[port_index]));
                port_index += 1;
                counter += 1;
            }

            // Sending the remainder of ports, one per peer until they're gone
            if (num_remaining > 0) {
                put(peer_port, Option::Some(to_transmit[port_index]));
                port_index += 1;
                num_remaining -= 1;
            }

            // Finish the custom protocol by sending nothing, which indicates to
            // the peer that it has received all the ports we have to hand out.
            put(peer_port, Option::None);
        }

        peer_index += 1;
    }
}

// And here we have the component which will receive on that port. We can design
// the synchronous regions any we want. In this case when we receive ports we
// just synchronize `port_sender`, but the moment we receive messages we
// synchronize with everyone.

comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
    // Receive all ports
    auto value_rxs = {};

    sync {
        while (true) {
            auto maybe_port = get(port_rxs);
            if (let Option::Some(certainly_a_port) = maybe_port) {
                value_rxs @= { certainly_a_port };
            } else {
                break;
            }
        }
    }

    // Receive all values
    auto received_sum = 0;

    sync {
        auto port_index = 0;
        auto num_ports = length(value_rxs);
        while (port_index < num_ports) {
            auto value = get(value_rxs[port_index]);
            received_sum += value;
            port_index += 1;
        }
    }

    // And send the sum
    sync put(sum_tx, received_sum);
}

// Now we need something to send the values, we'll make something incredibly
// simple. Namely:

comp value_sender(out<u32> tx, u32 value_to_send) {
    sync put(tx, value_to_send);
}

comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
    auto sum = 0;
    auto index = 0;
    while (index < length(partial_sum_rx)) {
        sync sum += get(partial_sum_rx[index]);
        index += 1;
    }

    sync put(total_sum_tx, sum);
}

// And we need the component to set this entire system of components up. So we
// write the following entry point.

comp main() {
    auto num_value_ports = 32;
    auto num_receivers = 3;

    // Construct the senders of values
    auto value_port_index = 1;
    auto value_rx_ports = {};
    while (value_port_index <= num_value_ports) {
        channel value_tx -> value_rx;
        new value_sender(value_tx, value_port_index);
        value_rx_ports @= { value_rx };
        value_port_index += 1;
    }

    // Construct the components that will receive groups of value-receiving
    // ports
    auto receiver_index = 0;
    auto sum_combine_rx_ports = {};
    auto port_tx_ports = {};

    while (receiver_index < num_receivers) {
        channel sum_tx -> sum_rx;
        channel port_tx -> port_rx;
        new port_receiver(port_rx, sum_tx);

        sum_combine_rx_ports @= { sum_rx };
        port_tx_ports @= { port_tx };
        receiver_index += 1;
    }

    // Construct the component that redistributes the total number of input
    // ports.
    new port_sender(port_tx_ports, value_rx_ports);

    // Construct the component that computes the sum of all sent values
    channel total_value_tx -> total_value_rx;
    new sum_collector(sum_combine_rx_ports, total_value_tx);

    auto expected = num_value_ports * (num_value_ports + 1) / 2;
    auto received = 0;

    sync received = get(total_value_rx);

    if (expected == received) {
        print("got the expected value!");
    } else {
        print("got something entirely different");
    }
}