diff --git a/Cargo.toml b/Cargo.toml index d6978deb8b50cbaffd0d11dd124dccbef5758d6e..e555a0d97ebe3d1cb46ebae6cdf79c9fd3a41ca2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ crate-type = [ ] [features] -default = ["ffi", "session_optimization"] +default = ["ffi"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs. endpoint_logging = [] # see src/macros.rs diff --git a/examples/bench_27/main.c b/examples/bench_27/main.c new file mode 100644 index 0000000000000000000000000000000000000000..8471d124fc401559b6fef1281c42e21292a2ca7e --- /dev/null +++ b/examples/bench_27/main.c @@ -0,0 +1,57 @@ +#include +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + int i, rounds; + char optimized = argv[1][0]; + rounds = atoi(argv[2]); + printf("optimized %c, rounds %d\n", optimized, rounds); + + unsigned char pdl[] = "\ + primitive xrouter(in a, out b, out c) {\ + while(true) synchronous {\ + if(fires(a)) {\ + if(fires(b)) put(b, get(a));\ + else put(c, get(a));\ + }\ + }\ + }\ + "; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + Connector * c = connector_new_with_id(pd, 0); + PortId ports[8]; + if(optimized=='y') { + connector_add_port_pair(c, &ports[0], &ports[1]); + connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized + connector_add_component(c, "sync", 4, ports+1, 2); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } else { + for(i=0; i<4; i++) { + connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]); + } + connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + connector_connect(c, -1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + size_t msg_len = 1000; + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + clock_t begin = clock(); + for (i=0; i +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + int i, rounds; + char optimized = argv[1][0]; + rounds = atoi(argv[2]); + printf("optimized %c, rounds %d\n", optimized, rounds); + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + Connector * c = connector_new_with_id(pd, 0); + PortId ports[6]; + for(i=0; i<3; i++) { + connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]); + } + connector_add_component(c, "sync", 4, ports+1, 2); + if(optimized=='y') { + connector_add_component(c, "forward", 7, ports+3, 2); + } else { + connector_add_component(c, "sync", 4, ports+3, 2); + } + connector_connect(c, -1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + size_t msg_len = 1000; + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + clock_t begin = clock(); + for (i=0; i +#include "../../reowolf.h" +#include "../utility.c" +FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { + FfiSocketAddr x; + x.port = port; + memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4); + return x; +} +int main(int argc, char** argv) { + int i, rounds; + char optimized = argv[1][0]; + char sender = argv[2][0]; + rounds = atoi(argv[3]); + uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) }; + size_t msg_len = atoi(argv[8]); + + printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n", + optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len); + + unsigned char pdl[] = "\ + primitive filter(in i, out o) {\ + while(true) synchronous() {\ + msg m = get(i);\ + if(m[0] == 0) put(o, m);\ + }\ + }\ + "; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + Connector * c = connector_new_with_id(pd, sender=='y'?1:0); + PortId ports[3]; // orientation: 0->1->2 (subsets may be initialized) sender puts on 0. !sender gets on 2. + char ident[] = "filter"; + FfiSocketAddr addr = addr_new(ipv4, 7000); + if(sender=='y') { + Polarity p = Polarity_Putter; + EndpointPolarity ep = EndpointPolarity_Active; + if(optimized=='y') { + // 3 ports: (native)0-->1(filter)2-->(NETWORK) + connector_add_port_pair(c, &ports[0], &ports[1]); + connector_add_net_port(c, &ports[2], addr, p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_component(c, ident, sizeof(ident)-1, ports+1, 2); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } else { + // 1 port + connector_add_net_port(c, &ports[0], addr, p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + } else { + Polarity p = Polarity_Getter; + EndpointPolarity ep = EndpointPolarity_Passive; + if(optimized=='y') { + // 1 port + connector_add_net_port(c, &ports[2], addr, p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } else { + // 3 ports: (NETWORK)-->0(filter)1-->2(native) + connector_add_net_port(c, &ports[0], addr, p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_port_pair(c, &ports[1], &ports[2]); + connector_add_component(c, ident, sizeof(ident)-1, ports, 2); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + } + connector_connect(c, -1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + clock_t begin = clock(); + for (i=0; i +#include "../../reowolf.h" +#include "../utility.c" +FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { + FfiSocketAddr x; + x.port = port; + memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4); + return x; +} +int main(int argc, char** argv) { + int i, rounds; + char optimized = argv[1][0]; + char sender = argv[2][0]; + rounds = atoi(argv[3]); + uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) }; + size_t msg_len = atoi(argv[8]); + + printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n", + optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len); + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + Connector * c = connector_new_with_id(pd, sender=='y'?1:0); + + PortId ports[5]; // sender always puts 0, receiver always gets 3, 4 + char ident[] = "replicator"; + FfiSocketAddr addrs[2] = { + addr_new(ipv4, 7000), + addr_new(ipv4, 7001) + }; + if(sender=='y') { + Polarity p = Polarity_Putter; + EndpointPolarity ep = EndpointPolarity_Active; + if(optimized=='y') { + // 1 port: (native)0-->(NETWORK) + connector_add_net_port(c, &ports[0], addrs[0], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } else { + // 4 ports: (native)0-->1(replicator)2-->(NETWORK) + // 3-->(NETWORK) + connector_add_port_pair(c, &ports[0], &ports[1]); + connector_add_net_port(c, &ports[2], addrs[0], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_net_port(c, &ports[3], addrs[1], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_component(c, ident, sizeof(ident)-1, ports+1, 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + } else { + Polarity p = Polarity_Getter; + EndpointPolarity ep = EndpointPolarity_Passive; + if(optimized=='y') { + // 5 ports: (NETWORK)-->0(replicator)1-->3(native) + // 2-->4 + connector_add_net_port(c, &ports[0], addrs[0], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_port_pair(c, &ports[1], &ports[3]); + connector_add_port_pair(c, &ports[2], &ports[4]); + connector_add_component(c, ident, sizeof(ident)-1, ports, 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } else { + // 2 ports: (NETWORK)-->3(native) + // -->4 + connector_add_net_port(c, &ports[3], addrs[0], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_net_port(c, &ports[4], addrs[1], p, ep); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + } + connector_connect(c, -1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + clock_t begin = clock(); + for (i=0; i Result)> { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); + // `try_complete` is a helper function, which DOES NOT own `cu`, and returns ConnectError on err. + // This outer function takes its output and wraps it alongside `cu` (which it owns) + // as appropriate for Err(...) and OK(...) cases. let mut try_complete = || { // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = setup_endpoints_and_pair_ports( @@ -1030,12 +1033,12 @@ fn session_optimize( // and returning an optimized map. fn leader_session_map_optimize( logger: &mut dyn Logger, - unoptimized_map: HashMap, + mut m: HashMap, ) -> Result, ConnectError> { log!(logger, "Session map optimize START"); // currently, it's the identity function log!(logger, "Session map optimize END"); - Ok(unoptimized_map) + Ok(m) } // Modify the given connector's internals to reflect @@ -1052,7 +1055,7 @@ fn apply_my_optimizations( endpoint_incoming_to_getter, } = session_info; // simply overwrite the contents - // println!("BEFORE: {:#?}\n{:#?}", cu, comm); + println!("BEFORE: {:#?}\n{:#?}", cu, comm); cu.ips.port_info = port_info; assert!(cu.ips.port_info.invariant_preserved()); cu.proto_components = proto_components; diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 9e558be20b3d78599677f97bc3cd877c37dd1fa1..0a650114e5bc24e126c26fbc2b717ad25c50e09d 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1309,30 +1309,3 @@ fn for_msg_byte() { } c.sync(None).unwrap(); } - -#[test] -fn message_concat() { - // Note: PDL quirks: - // 1. declarations as first lines of a scope - // 2. var names cannot be prefixed by types. Eg `msg_concat` prohibited. - let test_log_path = Path::new("./logs/message_concat"); - let pdl = b" - primitive message_concat(out o) { - msg a = create(1); - msg b = create(1); - a[0] = 0; - b[0] = 1; - synchronous() put(o, a+b); - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - c.add_component(b"message_concat", &[p0]).unwrap(); - c.connect(None).unwrap(); - c.get(g0).unwrap(); - c.sync(None).unwrap(); - assert_eq!(&[0, 1], c.gotten(g0).unwrap().as_slice()); -}