Changeset - b972060a1f92
[Not reviewed]
0 4 4
Christopher Esterhuyse - 5 years ago 2020-10-09 15:07:15
christopher.esterhuyse@gmail.com
more optimization-stage benchmarking examples
8 files changed with 295 insertions and 32 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -43,7 +43,7 @@ crate-type = [
 
]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
examples/bench_27/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "\
 
    primitive xrouter(in a, out b, out c) {\
 
        while(true) synchronous {\
 
            if(fires(a)) {\
 
                if(fires(b)) put(b, get(a));\
 
                else         put(c, get(a));\
 
            }\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[8];
 
    if(optimized=='y') {
 
        connector_add_port_pair(c, &ports[0], &ports[1]);
 
        connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized
 
        connector_add_component(c, "sync", 4, ports+1, 2);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    } else {
 
        for(i=0; i<4; i++) {
 
            connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
        }
 
        connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        connector_put_bytes(c, ports[0], msg, msg_len);
 
        connector_get(c, ports[7]);
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_28/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[6];
 
    for(i=0; i<3; i++) {
 
        connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
    }
 
    connector_add_component(c, "sync", 4, ports+1, 2);
 
    if(optimized=='y') {
 
        connector_add_component(c, "forward", 7, ports+3, 2);
 
    } else {
 
        connector_add_component(c, "sync",    4, ports+3, 2);
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        connector_put_bytes(c, ports[0], msg, msg_len);
 
        connector_get(c, ports[5]);
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_29/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) {
 
    FfiSocketAddr x;
 
    x.port = port;
 
    memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4);
 
    return x;
 
}
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    char sender = argv[2][0];
 
    rounds = atoi(argv[3]);
 
    uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) };
 
    size_t msg_len = atoi(argv[8]);
 

	
 
    printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n",
 
        optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len);
 

	
 
    unsigned char pdl[] = "\
 
    primitive filter(in i, out o) {\
 
        while(true) synchronous() {\
 
            msg m = get(i);\
 
            if(m[0] == 0) put(o, m);\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, sender=='y'?1:0);
 
    PortId ports[3]; // orientation: 0->1->2 (subsets may be initialized) sender puts on 0. !sender gets on 2. 
 
    char ident[] = "filter";
 
    FfiSocketAddr addr = addr_new(ipv4, 7000);
 
    if(sender=='y') {
 
        Polarity p = Polarity_Putter;
 
        EndpointPolarity ep = EndpointPolarity_Active;
 
        if(optimized=='y') {
 
            // 3 ports: (native)0-->1(filter)2-->(NETWORK)
 
            connector_add_port_pair(c, &ports[0], &ports[1]);
 
            connector_add_net_port(c, &ports[2], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_component(c, ident, sizeof(ident)-1, ports+1, 2);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 1 port
 
            connector_add_net_port(c, &ports[0], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    } else {
 
        Polarity p = Polarity_Getter;
 
        EndpointPolarity ep = EndpointPolarity_Passive;
 
        if(optimized=='y') {
 
            // 1 port
 
            connector_add_net_port(c, &ports[2], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 3 ports: (NETWORK)-->0(filter)1-->2(native)
 
            connector_add_net_port(c, &ports[0], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_port_pair(c, &ports[1], &ports[2]);
 
            connector_add_component(c, ident, sizeof(ident)-1, ports, 2);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        if(sender=='y') {
 
            msg[0] = (char) i%2;
 
            connector_put_bytes(c, ports[0], msg, msg_len);
 
            // always put
 
        } else {
 
            // no-get option
 
            connector_next_batch(c);
 
            // get option
 
            connector_get(c, ports[2]);
 
        }
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_30/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) {
 
    FfiSocketAddr x;
 
    x.port = port;
 
    memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4);
 
    return x;
 
}
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    char sender = argv[2][0];
 
    rounds = atoi(argv[3]);
 
    uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) };
 
    size_t msg_len = atoi(argv[8]);
 

	
 
    printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n",
 
        optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, sender=='y'?1:0);
 

	
 
    PortId ports[5]; // sender always puts 0, receiver always gets 3, 4
 
    char ident[] = "replicator";
 
    FfiSocketAddr addrs[2] = {
 
        addr_new(ipv4, 7000),
 
        addr_new(ipv4, 7001)
 
    };
 
    if(sender=='y') {
 
        Polarity p = Polarity_Putter;
 
        EndpointPolarity ep = EndpointPolarity_Active;
 
        if(optimized=='y') {
 
            // 1 port: (native)0-->(NETWORK)
 
            connector_add_net_port(c, &ports[0], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 4 ports: (native)0-->1(replicator)2-->(NETWORK)
 
            //                                   3-->(NETWORK)
 
            connector_add_port_pair(c, &ports[0], &ports[1]);
 
            connector_add_net_port(c, &ports[2], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_net_port(c, &ports[3], addrs[1], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_component(c, ident, sizeof(ident)-1, ports+1, 3);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    } else {
 
        Polarity p = Polarity_Getter;
 
        EndpointPolarity ep = EndpointPolarity_Passive;
 
        if(optimized=='y') {
 
            // 5 ports: (NETWORK)-->0(replicator)1-->3(native)
 
            //                                   2-->4
 
            connector_add_net_port(c, &ports[0], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_port_pair(c, &ports[1], &ports[3]);
 
            connector_add_port_pair(c, &ports[2], &ports[4]);
 
            connector_add_component(c, ident, sizeof(ident)-1, ports, 3);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 2 ports: (NETWORK)-->3(native)
 
            //                   -->4
 
            connector_add_net_port(c, &ports[3], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_net_port(c, &ports[4], addrs[1], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        if(sender=='y') {
 
            connector_put_bytes(c, ports[0], msg, msg_len);
 
        } else {
 
            connector_get(c, ports[3]);
 
            connector_get(c, ports[4]);
 
        }
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
src/protocol/inputsource.rs
Show inline comments
 
@@ -28,12 +28,14 @@ primitive alternator(in i, out l, out r) {
 
    }
 
}
 
primitive replicator(in i, out l, out r) {
 
    while(true) synchronous() if(fires(i)) {
 
    while(true) synchronous {
 
        if(fires(i)) {
 
            msg m = get(i);
 
            put(l, m);
 
            put(r, m);
 
        }
 
    }
 
}
 
primitive merger(in l, in r, out o) {
 
    while(true) synchronous {
 
        if(fires(l))      put(o, get(l));
src/runtime/setup.rs
Show inline comments
 
@@ -250,6 +250,9 @@ impl Connector {
 
    ) -> Result<Self, (ConnectError, Box<dyn Logger>)> {
 
        log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
        let deadline = timeout.map(|to| Instant::now() + to);
 
        // `try_complete` is a helper function, which DOES NOT own `cu`, and returns ConnectError on err.
 
        // This outer function takes its output and wraps it alongside `cu` (which it owns)
 
        // as appropriate for Err(...) and OK(...) cases.
 
        let mut try_complete = || {
 
            // connect all endpoints in parallel; send and receive peer ids through ports
 
            let mut endpoint_manager = setup_endpoints_and_pair_ports(
 
@@ -1030,12 +1033,12 @@ fn session_optimize(
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
    mut m: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
    Ok(m)
 
}
 

	
 
// Modify the given connector's internals to reflect
 
@@ -1052,7 +1055,7 @@ fn apply_my_optimizations(
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // simply overwrite the contents
 
    // println!("BEFORE: {:#?}\n{:#?}", cu, comm);
 
    println!("BEFORE: {:#?}\n{:#?}", cu, comm);
 
    cu.ips.port_info = port_info;
 
    assert!(cu.ips.port_info.invariant_preserved());
 
    cu.proto_components = proto_components;
src/runtime/tests.rs
Show inline comments
 
@@ -1309,30 +1309,3 @@ fn for_msg_byte() {
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn message_concat() {
 
    // Note: PDL quirks:
 
    // 1. declarations as first lines of a scope
 
    // 2. var names cannot be prefixed by types. Eg `msg_concat` prohibited.
 
    let test_log_path = Path::new("./logs/message_concat");
 
    let pdl = b"
 
    primitive message_concat(out o) {
 
        msg a = create(1);
 
        msg b = create(1);
 
        a[0] = 0;
 
        b[0] = 1;
 
        synchronous() put(o, a+b);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"message_concat", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g0).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(&[0, 1], c.gotten(g0).unwrap().as_slice());
 
}
0 comments (0 inline, 0 general)