Files @ 97217a7b2d18
Branch filter:

Location: CSY/reowolf/testdata/examples/03_transmitting_ports_02.pdl - annotation

Max Henger
feat: examples
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
97217a7b2d18
// Ofcourse we may do something a little more complicated than this. Suppose
// that we don't just send one port, but send a series of ports. i.e. we use
// an `Option` union type, to ...

union Option<T> {
    Some(T),
    None,
}

// ... turn an array of ports that we're going to transmit into a series of
// messages containing ports, each sent to a specific component.

comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
    auto num_peers = length(txs);
    auto num_ports = length(to_transmit);

    auto num_per_peer = num_ports / num_peers;
    auto num_remaining = num_ports - (num_per_peer * num_peers);

    auto peer_index = 0;
    auto port_index = 0;
    while (peer_index < num_peers) {
        auto peer_port = txs[peer_index];
        auto counter = 0;

        // Distribute part of the ports to one of the peers.
        sync {
            // Sending the main batch of ports for the peer
            while (counter < num_per_peer) {
                put(peer_port, Option::Some(to_transmit[port_index]));
                port_index += 1;
                counter += 1;
            }

            // Sending the remainder of ports, one per peer until they're gone
            if (num_remaining > 0) {
                put(peer_port, Option::Some(to_transmit[port_index]));
                port_index += 1;
                num_remaining -= 1;
            }

            // Finish the custom protocol by sending nothing, which indicates to
            // the peer that it has received all the ports we have to hand out.
            put(peer_port, Option::None);
        }

        peer_index += 1;
    }
}

// And here we have the component which will receive on that port. We can design
// the synchronous regions any we want. In this case when we receive ports we
// just synchronize `port_sender`, but the moment we receive messages we
// synchronize with everyone.

comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
    // Receive all ports
    auto value_rxs = {};

    sync {
        while (true) {
            auto maybe_port = get(port_rxs);
            if (let Option::Some(certainly_a_port) = maybe_port) {
                value_rxs @= { certainly_a_port };
            } else {
                break;
            }
        }
    }

    // Receive all values
    auto received_sum = 0;

    sync {
        auto port_index = 0;
        auto num_ports = length(value_rxs);
        while (port_index < num_ports) {
            auto value = get(value_rxs[port_index]);
            received_sum += value;
            port_index += 1;
        }
    }

    // And send the sum
    sync put(sum_tx, received_sum);
}

// Now we need something to send the values, we'll make something incredibly
// simple. Namely:

comp value_sender(out<u32> tx, u32 value_to_send) {
    sync put(tx, value_to_send);
}

comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
    auto sum = 0;
    auto index = 0;
    while (index < length(partial_sum_rx)) {
        sync sum += get(partial_sum_rx[index]);
        index += 1;
    }

    sync put(total_sum_tx, sum);
}

// And we need the component to set this entire system of components up. So we
// write the following entry point.

comp main() {
    auto num_value_ports = 32;
    auto num_receivers = 3;

    // Construct the senders of values
    auto value_port_index = 1;
    auto value_rx_ports = {};
    while (value_port_index <= num_value_ports) {
        channel value_tx -> value_rx;
        new value_sender(value_tx, value_port_index);
        value_rx_ports @= { value_rx };
        value_port_index += 1;
    }

    // Construct the components that will receive groups of value-receiving
    // ports
    auto receiver_index = 0;
    auto sum_combine_rx_ports = {};
    auto port_tx_ports = {};

    while (receiver_index < num_receivers) {
        channel sum_tx -> sum_rx;
        channel port_tx -> port_rx;
        new port_receiver(port_rx, sum_tx);

        sum_combine_rx_ports @= { sum_rx };
        port_tx_ports @= { port_tx };
        receiver_index += 1;
    }

    // Construct the component that redistributes the total number of input
    // ports.
    new port_sender(port_tx_ports, value_rx_ports);

    // Construct the component that computes the sum of all sent values
    channel total_value_tx -> total_value_rx;
    new sum_collector(sum_combine_rx_ports, total_value_tx);

    auto expected = num_value_ports * (num_value_ports + 1) / 2;
    auto received = 0;

    sync received = get(total_value_rx);

    if (expected == received) {
        print("got the expected value!");
    } else {
        print("got something entirely different");
    }
}