Changeset - 63a4a437bc0a
[Not reviewed]
0 1 0
MH - 3 years ago 2022-05-19 19:14:22
contact@maxhenger.nl
Add more complicated port sending example
1 file changed with 145 insertions and 4 deletions:
0 comments (0 inline, 0 general)
testdata/examples/03_transmitting_ports_02.pdl
Show inline comments
 
// Ofcourse we may do something a little more complicated than this. Suppose
 
// that we don't just send one port, but send a series of ports. i.e.
 
// that we don't just send one port, but send a series of ports. i.e. we use
 
// an `Option` union type, to ...
 

	
 
union Option<T> {
 
    Some(T),
 
    None,
 
}
 

	
 
comp port_sender(out<Option<in<u32>>> tx, in<u32>[] to_transmit) {
 
// ... turn an array of ports that we're going to transmit into a series of
 
// messages containing ports, each sent to a specific component.
 

	
 
comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
 
    auto num_peers = length(txs);
 
    auto num_ports = length(to_transmit);
 
    auto index = 0;
 
    while (index < num_ports) {
 

	
 
    auto num_per_peer = num_ports / num_peers;
 
    auto num_remaining = num_ports - (num_per_peer * num_peers);
 

	
 
    auto peer_index = 0;
 
    auto port_index = 0;
 
    while (peer_index < num_peers) {
 
        auto peer_port = txs[peer_index];
 
        auto counter = 0;
 

	
 
        // Distribute part of the ports to one of the peers.
 
        sync {
 
            // Sending the main batch of ports for the peer
 
            while (counter < num_per_peer) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                counter += 1;
 
            }
 

	
 
            // Sending the remainder of ports, one per peer until they're gone
 
            if (num_remaining > 0) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                num_remaining -= 1;
 
            }
 

	
 
            // Finish the custom protocol by sending nothing, which indicates to
 
            // the peer that it has received all the ports we have to hand out.
 
            put(peer_port, Option::None);
 
        }
 

	
 
        peer_index += 1;
 
    }
 
}
 

	
 
// And here we have the component which will receive on that port. We can design
 
// the synchronous regions any we want. In this case when we receive ports we
 
// just synchronize `port_sender`, but the moment we receive messages we
 
// synchronize with everyone.
 

	
 
comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
 
    // Receive all ports
 
    auto value_rxs = {};
 

	
 
    sync {
 
        while (true) {
 
            auto maybe_port = get(port_rxs);
 
            if (let Option::Some(certainly_a_port) = maybe_port) {
 
                value_rxs @= { certainly_a_port };
 
            } else {
 
                break;
 
            }
 
        }
 
    }
 

	
 
    // Receive all values
 
    auto received_sum = 0;
 

	
 
    sync {
 
        auto port_index = 0;
 
        auto num_ports = length(value_rxs);
 
        while (port_index < num_ports) {
 
            auto value = get(value_rxs[port_index]);
 
            received_sum += value;
 
            port_index += 1;
 
        }
 
    }
 

	
 
    // And send the sum
 
    sync put(sum_tx, received_sum);
 
}
 

	
 
// Now we need something to send the values, we'll make something incredibly
 
// simple. Namely:
 

	
 
comp value_sender(out<u32> tx, u32 value_to_send) {
 
    sync put(tx, value_to_send);
 
}
 

	
 
comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
 
    auto sum = 0;
 
    auto index = 0;
 
    while (index < length(partial_sum_rx)) {
 
        sync sum += get(partial_sum_rx[index]);
 
        index += 1;
 
    }
 

	
 
    sync put(total_sum_tx, sum);
 
}
 

	
 
// And we need the component to set this entire system of components up. So we
 
// write the following entry point.
 

	
 
comp main() {
 
    auto num_value_ports = 32;
 
    auto num_receivers = 3;
 

	
 
    // Construct the senders of values
 
    auto value_port_index = 1;
 
    auto value_rx_ports = {};
 
    while (value_port_index <= num_value_ports) {
 
        channel value_tx -> value_rx;
 
        new value_sender(value_tx, value_port_index);
 
        value_rx_ports @= { value_rx };
 
        value_port_index += 1;
 
    }
 

	
 
    // Construct the components that will receive groups of value-receiving
 
    // ports
 
    auto receiver_index = 0;
 
    auto sum_combine_rx_ports = {};
 
    auto port_tx_ports = {};
 

	
 
    while (receiver_index < num_receivers) {
 
        channel sum_tx -> sum_rx;
 
        channel port_tx -> port_rx;
 
        new port_receiver(port_rx, sum_tx);
 

	
 
        sum_combine_rx_ports @= { sum_rx };
 
        port_tx_ports @= { port_tx };
 
        receiver_index += 1;
 
    }
 

	
 
    // Construct the component that redistributes the total number of input
 
    // ports.
 
    new port_sender(port_tx_ports, value_rx_ports);
 

	
 
    // Construct the component that computes the sum of all sent values
 
    channel total_value_tx -> total_value_rx;
 
    new sum_collector(sum_combine_rx_ports, total_value_tx);
 

	
 
    auto expected = num_value_ports * (num_value_ports + 1) / 2;
 
    auto received = 0;
 

	
 
    sync received = get(total_value_rx);
 

	
 
    if (expected == received) {
 
        print("got the expected value!");
 
    } else {
 
        print("got something entirely different");
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)