diff --git a/Cargo.toml b/Cargo.toml index 0156021e86e5caaea6a2f28f4d63a3883e3980e7..0b87c1b7d064cbab7cbaa1a5dfba4f21f38b11d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "reowolf_rs" -version = "1.1.0" +version = "1.2.0" authors = [ "Max Henger ", "Christopher Esterhuyse ", @@ -39,14 +39,5 @@ lazy_static = "1.4.0" [lib] crate-type = [ - "rlib", # for use as a Rust dependency. - "cdylib" # for FFI use, typically C. -] - -[features] -default = ["ffi"] -ffi = [] # see src/ffi/mod.rs -ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs. -endpoint_logging = [] # see src/macros.rs -session_optimization = [] # see src/runtime/setup.rs -no_logging = [] # see src/macros.rs + "rlib", # for use as a Rust dependency. +] \ No newline at end of file diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index cfb55b833d773498efb7e4a2d802feaea7eba2e0..0000000000000000000000000000000000000000 --- a/examples/README.md +++ /dev/null @@ -1,33 +0,0 @@ -# Examples -This directory contains a set of programs for demonstrating the use of connectors for communications over the internet. - -## Setting up and running -First, ensure that the Reowolf has been compiled, such that a dylib is present in `reowolf/target/release/`; see the parent directory for instructions on compiling the library. - -The examples are designed to be run with the examples folder as your working directory (`reowolf/examples`), containing a local copy of the dylib. Two convenience scripts are made available for copying the library: `cpy_dll.sh` and `cpy_so.sh` for platforms Linux and Windows respectively. - -Compiling the example programs is done using Python 3, with `python3 ./make.py`, which will crawl the example directory's children and compiling any C source files. - -## Groups of examples and takeaways -### Incremental Connector API examples -The examples contained within directories with names prefixed with `incremental_` demonstrate usage of the connector API. This set of examples is characterized by each example being self-contained, designed to be run in isolation. The examples are best understood when read in the order of their directories' names (from 1 onward), as they demonstrate the functionality of the connector API starting from the most (trivially) simple connectors, to more complex connectors incorporating multiple network channels and components. - -Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example. - -### Presentation examples -The examples contained within directories with names matching `pres_` are designed to accompany the Reowolf demonstration slides (inclusion here TODO). -Examples include interactions whose distributed sessions span multiple source files. What follows is a list of the sessions' consituent programs, along with what the session intends to demonstrate - -1. {pres_1/amy, pres_1/bob}: Connectors can be used to transmit messages over the internet in a fashion comparable to that of UDP sockets. - -2. {pres_1/amy, pres_2/bob}: The protocol descriptions used to configure components are loaded and parsed at runtime. Consequently, changing these descriptions can change the behavior of the system without recompiling any of the constituent programs. -2. {pres_3/amy, pres_3/bob}: *Atomicity*. Message exchange actions are grouped into synchronous interactions. Actions occur with observable effects IFF their interactions are well-formed, and complete successfully. For example, a put succeeds IFF its accompanying get succeeds. -2. {pres_3/amy, pres_4/bob}: *Nondeterminism*. Programs/components can express flexibility by providing mutually-exclusive firing patterns on their ports, as a nondeterministic choice. Which (if any) choice occurs can be determined after synchronization by inspecting the return value of `connector_sync`. Atomicity + Nondeterminism = Specialization of behavior. -2. {pres_5/amy, pres_5/bob}: When no synchronous interaction is found before some consituent program times out, the system RECOVERS to the synchronous state at the start of the round, allowing components to try again. - -### Interoperability examples -The examples contained within directories with names matching `interop_` demonstrate the use of different APIs for communication over UDP channels. The three given programs are intended to be run together, each as its own process. - -Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example. - -NOTE: These examples are designed to compile on Linux! \ No newline at end of file diff --git a/examples/bench_01/main.c b/examples/bench_01/main.c deleted file mode 100644 index 8aff611dcaeb6fefc288fdd6714e67aaed426547..0000000000000000000000000000000000000000 --- a/examples/bench_01/main.c +++ /dev/null @@ -1,20 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./bench_1.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - clock_t begin = clock(); - int i; - for (i=0; i<1000000; i++) { - connector_sync(c, -1); - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Time taken: %f\n", time_spent); - return 0; -} \ No newline at end of file diff --git a/examples/bench_02/main.c b/examples/bench_02/main.c deleted file mode 100644 index 6973eaa6ab092e48cf3877da2623c47d80912587..0000000000000000000000000000000000000000 --- a/examples/bench_02/main.c +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i; - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./bench_2.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - int port_pairs = atoi(argv[1]); - printf("Port pairs: %d\n", port_pairs); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, port_pairs; - port_pairs = atoi(argv[1]); - printf("Port pairs: %d\n", port_pairs); - - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./bench_3.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, proto_components; - proto_components = atoi(argv[1]); - printf("proto_components: %d\n", proto_components); - - const unsigned char pdl[] = - "primitive trivial_loop() { " - " while(true) sync {}" - "} " - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - char logpath[] = "./bench_4.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, port_pairs, proto_components; - port_pairs = atoi(argv[1]); - proto_components = atoi(argv[2]); - printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components); - - const unsigned char pdl[] = - "primitive trivial_loop() { " - " while(true) sync {}" - "} " - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - char logpath[] = "./bench_5.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, self_syncs; - self_syncs = atoi(argv[1]); - printf("self_syncs %d\n", self_syncs); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - char logpath[] = "./bench_6.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, forwards; - forwards = atoi(argv[1]); - printf("forwards %d\n", forwards); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_7.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, forwards, msglen; - forwards = atoi(argv[1]); - msglen = atoi(argv[2]); - printf("forwards %d, msglen %d\n", forwards, msglen); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_8.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, proto_components; - proto_components = atoi(argv[1]); - printf("proto_components: %d\n", proto_components); - - const unsigned char pdl[] = - "primitive presync_work() { " - " int i = 0; " - " while(true) { " - " i = 0; " - " while(i < 2) i++; " - " sync {} " - " } " - "} " - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - char logpath[] = "./bench_4.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, inside, total; - inside = atoi(argv[1]); - total = atoi(argv[2]); - printf("inside %d, total %d\n", inside, total); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_10.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - char ident[] = "sync"; // defined in reowolf's stdlib - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, j, forwards, num_options, correct_index; - forwards = atoi(argv[1]); - num_options = atoi(argv[2]); - printf("forwards %d, num_options %d\n", - forwards, num_options); - unsigned char pdl[] = - "primitive recv_zero(in a) { " - " while(true) sync {" - " msg m = get(a); " - " assert(m[0] == 0); " - " } " - "} " - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_11.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, j, batches; - batches = atoi(argv[1]); - printf("batches %d\n", batches); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_12.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - clock_t begin = clock(); - char msg = 0; - for (i=0; i<1000000; i++) { - for(j=1; j -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, msglen, inside, total; - char * transport; - transport = argv[1]; - msglen = atoi(argv[2]); - inside = atoi(argv[3]); - total = atoi(argv[4]); - printf("transport `%s`, msglen %d, inside %d, total %d\n", - transport, msglen, inside, total); - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_13.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - char ident[] = "sync"; // defined in reowolf's stdlib - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, msglen; - msglen = atoi(argv[1]); - printf("msglen %d\n", msglen); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char * msg = malloc(msglen); - memset(msg, msglen, 42); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_14_amy.txt"; - Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0); - - PortId putter, getter; - connector_add_net_port( - c, - &putter, - (FfiSocketAddr) {{127, 0, 0, 1}, 7000}, - Polarity_Putter, - EndpointPolarity_Active); - connector_add_net_port( - c, - &getter, - (FfiSocketAddr) {{127, 0, 0, 1}, 7001}, - Polarity_Getter, - EndpointPolarity_Passive); - connector_connect(c, -1); - - clock_t begin = clock(); - for (i=0; i<10000; i++) { - connector_put_bytes(c, putter, msg, msglen); - connector_get(c, getter); - connector_sync(c, -1); - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Time taken: %f\n", time_spent); - - free(msg); - return 0; -} \ No newline at end of file diff --git a/examples/bench_14/bob.c b/examples/bench_14/bob.c deleted file mode 100644 index c878b3516030560f4a762e370420135236db7fc8..0000000000000000000000000000000000000000 --- a/examples/bench_14/bob.c +++ /dev/null @@ -1,39 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i; - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_14_bob.txt"; - Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1); - - PortId putter, getter; - connector_add_net_port( - c, - &putter, - (FfiSocketAddr) {{127, 0, 0, 1}, 7001}, - Polarity_Putter, - EndpointPolarity_Active); - connector_add_net_port( - c, - &getter, - (FfiSocketAddr) {{127, 0, 0, 1}, 7000}, - Polarity_Getter, - EndpointPolarity_Passive); - connector_add_component(c, "forward", 7, (PortId[]){getter, putter}, 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - clock_t begin = clock(); - for (i=0; i<10000; i++) { - connector_sync(c, -1); - } - clock_t end = clock(); - - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Time taken: %f\n", time_spent); - return 0; -} \ No newline at end of file diff --git a/examples/bench_15/main.c b/examples/bench_15/main.c deleted file mode 100644 index b1db4ed36ca51834a7dd8382ceffe7d571be5adb..0000000000000000000000000000000000000000 --- a/examples/bench_15/main.c +++ /dev/null @@ -1,46 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, cid; - cid = atoi(argv[1]); - printf("cid %d\n", cid); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - - bool seen_delim = false; - for(i=2; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, cid; - cid = atoi(argv[1]); - printf("cid %d\n", cid); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - - bool seen_delim = false; - for(i=2; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, j, cid, min_putter, min_getter, ports_tot, ports_used; - char do_puts, do_gets; - cid = atoi(argv[1]); - min_putter = atoi(argv[2]); - min_getter = atoi(argv[3]); - ports_tot = atoi(argv[4]); - ports_used = atoi(argv[5]); - do_puts = argv[6][0]; // 't' or 'f' - do_gets = argv[7][0]; - printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c\n", - cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - PortId putters[ports_tot], getters[ports_tot]; - for(i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - // all outward connections are ACTIVE to localhost - // use tcp_rendezvous - int i, j, cid, min_putter, min_getter, ports_tot, ports_used; - char do_puts, do_gets; - cid = atoi(argv[1]); - min_putter = atoi(argv[2]); - min_getter = atoi(argv[3]); - ports_tot = atoi(argv[4]); - ports_used = atoi(argv[5]); - do_puts = argv[6][0]; // 't' or 'f' - do_gets = argv[7][0]; - printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c\n", - cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - PortId putters[ports_tot], getters[ports_tot]; - for(i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - // bounce off tokyo to my public IP - int i, j, cid, min_putter, min_getter, ports_tot, ports_used, n_rounds; - char do_puts, do_gets; - cid = atoi(argv[1]); - min_putter = atoi(argv[2]); - min_getter = atoi(argv[3]); - ports_tot = atoi(argv[4]); - ports_used = atoi(argv[5]); - do_puts = argv[6][0]; // 't' or 'f' - do_gets = argv[7][0]; - n_rounds = atoi(argv[12]); - - // argv 8..12 is PEER_IP - - printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c, n_rounds %d\n", - cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets, n_rounds); - printf("peer_ip %d.%d.%d.%d\n", - atoi(argv[8]), - atoi(argv[9]), - atoi(argv[10]), - atoi(argv[11])); - - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - PortId putters[ports_tot], getters[ports_tot]; - for(i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - // same as bench 15 but connecting to 87.210.104.102 and getting at 0.0.0.0 - // also, doing 10k reps (down from 100k) to save time - int i, cid; - cid = atoi(argv[1]); - printf("cid %d\n", cid); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - - bool seen_delim = false; - for(i=2; i -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // one of two processes: {leader, follower} - // where a set of `par_msgs` messages are sent leader->follower after - // looping follower->leader->follower `msg_loops` times. - int i, j, cid, msg_loops, par_msgs; - char is_leader; - is_leader = argv[1][0]; - msg_loops = atoi(argv[2]); - par_msgs = atoi(argv[3]); - // argv[4..8] encodes peer IP - printf("is_leader %c, msg_loops %d, par_msgs %d\n", is_leader, msg_loops, par_msgs); - cid = is_leader=='y'; // cid := { leader:1, follower:0 } - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - PortId native_ports[par_msgs]; - FfiSocketAddr peer_addr = { - { - atoi(argv[4]), - atoi(argv[5]), - atoi(argv[6]), - atoi(argv[7]) - }, 0/*dummy value*/}; - int port = 7000; - - // for each parallel message - for(i=0; i -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // same as bench 21 but with parametric message length - int i, j, cid, msg_loops, par_msgs, msg_len; - char is_leader; - is_leader = argv[1][0]; - msg_loops = atoi(argv[2]); - par_msgs = atoi(argv[3]); - msg_len = atoi(argv[8]); - - // argv[4..8] encodes peer IP - printf("is_leader %c, msg_loops %d, par_msgs %d, msg_len %d\n", is_leader, msg_loops, par_msgs, msg_len); - cid = is_leader=='y'; // cid := { leader:1, follower:0 } - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, cid); - PortId native_ports[par_msgs]; - FfiSocketAddr peer_addr = { - { - atoi(argv[4]), - atoi(argv[5]), - atoi(argv[6]), - atoi(argv[7]) - }, 0/*dummy value*/}; - int port = 7000; - - // for each parallel message - for(i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i; - - // unsigned char pdl[] = "\ - // primitive xrouter(in a, out b, out c) {\ - // while(true) sync {\ - // if(fires(a)) {\ - // if(fires(b)) put(b, get(a));\ - // else put(c, get(a));\ - // }\ - // }\ - // }" - // ; - unsigned char pdl[] = "\ - primitive lossy(in a, out b) {\ - while(true) sync {\ - if(fires(a)) {\ - msg m = get(a);\ - if(fires(b)) put(b, m);\ - }\ - }\ - }\ - primitive sync_drain(in a, in b) {\ - while(true) sync {\ - if(fires(a)) {\ - get(a);\ - get(b);\ - }\ - }\ - }\ - composite xrouter(in a, out b, out c) {\ - channel d -> e;\ - channel f -> g;\ - channel h -> i;\ - channel j -> k;\ - channel l -> m;\ - channel n -> o;\ - channel p -> q;\ - channel r -> s;\ - channel t -> u;\ - new replicator(a, d, f);\ - new replicator(g, t, h);\ - new lossy(e, l);\ - new lossy(i, j);\ - new replicator(m, b, p);\ - new replicator(k, n, c);\ - new merger(q, o, r);\ - new sync_drain(u, s);\ - }" - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, 0); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId ports[6]; - for(i=0; i<3; i++) { - connector_add_port_pair(c, &ports[2*i], &ports[2*i+1]); - } - // [native~~~~~~~~~~] - // 0 1 2 3 4 5 - // | ^ | ^ | ^ - // `--` `--` `--` - char ident[] = "xrouter"; - connector_add_component( - c, - ident, - sizeof(ident)-1, - (PortId[]) { ports[1], ports[2], ports[4] }, - 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - // [native~~~~~~~~~~] - // 0 3 5 - // V ^ ^ - // 1 2 4 - // [xrouter~~~~~~~~~] - connector_connect(c, -1); - printf("Connect OK!\n"); - - int msg_len = 1000; - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - { - clock_t begin = clock(); - for (i=0; i<100000; i++) { - connector_put_bytes(c, ports[0], msg, msg_len); - connector_get(c, ports[3]); - connector_sync(c, -1); - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("First: %f\n", time_spent); - } - { - clock_t begin = clock(); - for (i=0; i<100000; i++) { - connector_put_bytes(c, ports[0], msg, msg_len); - connector_get(c, ports[5]); - connector_sync(c, -1); - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Second: %f\n", time_spent); - } - { - clock_t begin = clock(); - for (i=0; i<100000; i++) { - connector_put_bytes(c, ports[0], msg, msg_len); - connector_get(c, ports[3 + (i%2)*2]); - connector_sync(c, -1); - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Alternating: %f\n", time_spent); - } - free(msg); - return 0; -} \ No newline at end of file diff --git a/examples/bench_24/main.c b/examples/bench_24/main.c deleted file mode 100644 index 1cfde7b7270f7e2ef4da425b28b33d7720ee83a9..0000000000000000000000000000000000000000 --- a/examples/bench_24/main.c +++ /dev/null @@ -1,78 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, j; - - unsigned char pdl[] = "\ - primitive fifo1_init(msg m, in a, out b) {\ - while(true) sync {\ - if(m != null && fires(b)) {\ - put(b, m);\ - m = null;\ - } else if (m == null && fires(a)) {\ - m = get(a);\ - }\ - }\ - }\ - composite fifo1_full(in a, out b) {\ - new fifo1_init(create(0), a, b);\ - }\ - composite fifo1(in a, out b) {\ - new fifo1_init(null, a, b);\ - }\ - composite sequencer3(out a, out b, out c) {\ - channel d -> e;\ - channel f -> g;\ - channel h -> i;\ - channel j -> k;\ - channel l -> m;\ - channel n -> o;\ - new fifo1_full(o, d);\ - new replicator(e, f, a);\ - new fifo1(g, h);\ - new replicator(i, j, b);\ - new fifo1(k, l);\ - new replicator(m, n, c);\ - }" - ; - // unsigned char pdl[] = "\ - // primitive sequencer3(out a, out b, out c) {\ - // int i = 0;\ - // while(true) sync {\ - // out to = a;\ - // if (i==1) to = b;\ - // else if(i==2) to = c;\ - // if(fires(to)) {\ - // put(to, create(0));\ - // i = (i + 1)%3;\ - // }\ - // }\ - // }" - ; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - Connector * c = connector_new_with_id(pd, 0); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId putters[3], getters[3]; - for(i=0; i<3; i++) { - connector_add_port_pair(c, &putters[i], &getters[i]); - } - char ident[] = "sequencer3"; - connector_add_component(c, ident, sizeof(ident)-1, putters, 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_connect(c, -1); - printf("Connect OK!\n"); - - clock_t begin = clock(); - for (i=0; i<1000000/3; i++) { - for (j=0; j<3; j++) { - connector_get(c, getters[j]); - connector_sync(c, -1); - } - } - clock_t end = clock(); - double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; - printf("Time taken: %f\n", time_spent); - return 0; -} \ No newline at end of file diff --git a/examples/bench_25/main.c b/examples/bench_25/main.c deleted file mode 100644 index fff5774e8558f366b1d23c3ab6cd58b936b11ddc..0000000000000000000000000000000000000000 --- a/examples/bench_25/main.c +++ /dev/null @@ -1,60 +0,0 @@ -#include -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, j, syncs; - syncs = atoi(argv[1]); - printf("syncs %d\n", syncs); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - char logpath[] = "./bench_11.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - - PortId native_putter, native_getter; - connector_add_port_pair(c, &native_putter, &native_getter); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { - FfiSocketAddr x; - x.port = port; - int i; - for(i=0; i<4; i++) { - x.ipv4[i] = ipv4[i]; - } - return x; -} -int main(int argc, char** argv) { - int i, rounds; - char leader; - uint8_t ipv4[4] = { atoi(argv[1]), atoi(argv[2]), atoi(argv[3]), atoi(argv[4]) }; - leader = argv[5][0]; - rounds = atoi(argv[6]); - printf("leader %c, rounds %d, peer at %d.%d.%d.%d\n", - leader, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3]); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - Connector * c = connector_new_with_id(pd, leader=='y'?1:0); - PortId ports[2]; - if(leader=='y') { - EndpointPolarity ep = EndpointPolarity_Active; - connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Putter, ep); - connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Getter, ep); - } else { - EndpointPolarity ep = EndpointPolarity_Passive; - connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Getter, ep); - connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Putter, ep); - char ident[] = "sync"; - connector_add_component(c, ident, sizeof(ident)-1, ports, 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - size_t msg_len = 1000; - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - clock_t begin = clock(); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, rounds; - char optimized = argv[1][0]; - rounds = atoi(argv[2]); - printf("optimized %c, rounds %d\n", optimized, rounds); - - unsigned char pdl[] = "\ - primitive xrouter(in a, out b, out c) {\ - while(true) sync {\ - if(fires(a)) {\ - if(fires(b)) put(b, get(a));\ - else put(c, get(a));\ - }\ - }\ - }\ - "; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - Connector * c = connector_new_with_id(pd, 0); - PortId ports[8]; - if(optimized=='y') { - connector_add_port_pair(c, &ports[0], &ports[1]); - connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized - connector_add_component(c, "sync", 4, ports+1, 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } else { - for(i=0; i<4; i++) { - connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]); - } - connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - size_t msg_len = 1000; - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - clock_t begin = clock(); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - int i, rounds; - char optimized = argv[1][0]; - rounds = atoi(argv[2]); - printf("optimized %c, rounds %d\n", optimized, rounds); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - Connector * c = connector_new_with_id(pd, 0); - PortId ports[6]; - for(i=0; i<3; i++) { - connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]); - } - connector_add_component(c, "sync", 4, ports+1, 2); - if(optimized=='y') { - connector_add_component(c, "forward", 7, ports+3, 2); - } else { - connector_add_component(c, "sync", 4, ports+3, 2); - } - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - size_t msg_len = 1000; - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - clock_t begin = clock(); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { - FfiSocketAddr x; - x.port = port; - memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4); - return x; -} -int main(int argc, char** argv) { - int i, rounds; - char optimized = argv[1][0]; - char sender = argv[2][0]; - rounds = atoi(argv[3]); - uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) }; - size_t msg_len = atoi(argv[8]); - - printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n", - optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len); - - unsigned char pdl[] = "\ - primitive filter(in i, out o) {\ - while(true) synchronous() {\ - msg m = get(i);\ - if(m[0] == 0) put(o, m);\ - }\ - }\ - "; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - Connector * c = connector_new_with_id(pd, sender=='y'?1:0); - PortId ports[3]; // orientation: 0->1->2 (subsets may be initialized) sender puts on 0. !sender gets on 2. - char ident[] = "filter"; - FfiSocketAddr addr = addr_new(ipv4, 7000); - if(sender=='y') { - Polarity p = Polarity_Putter; - EndpointPolarity ep = EndpointPolarity_Active; - if(optimized=='y') { - // 3 ports: (native)0-->1(filter)2-->(NETWORK) - connector_add_port_pair(c, &ports[0], &ports[1]); - connector_add_net_port(c, &ports[2], addr, p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_component(c, ident, sizeof(ident)-1, ports+1, 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } else { - // 1 port - connector_add_net_port(c, &ports[0], addr, p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - } else { - Polarity p = Polarity_Getter; - EndpointPolarity ep = EndpointPolarity_Passive; - if(optimized=='y') { - // 1 port - connector_add_net_port(c, &ports[2], addr, p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } else { - // 3 ports: (NETWORK)-->0(filter)1-->2(native) - connector_add_net_port(c, &ports[0], addr, p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_port_pair(c, &ports[1], &ports[2]); - connector_add_component(c, ident, sizeof(ident)-1, ports, 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - } - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - clock_t begin = clock(); - for (i=0; i -#include "../../reowolf.h" -#include "../utility.c" -FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { - FfiSocketAddr x; - x.port = port; - memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4); - return x; -} -int main(int argc, char** argv) { - int i, rounds; - char optimized = argv[1][0]; - char sender = argv[2][0]; - rounds = atoi(argv[3]); - uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) }; - size_t msg_len = atoi(argv[8]); - - printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n", - optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len); - - unsigned char pdl[] = ""; - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - Connector * c = connector_new_with_id(pd, sender=='y'?1:0); - - PortId ports[5]; // sender always puts 0, receiver always gets 3, 4 - char ident[] = "replicator"; - FfiSocketAddr addrs[2] = { - addr_new(ipv4, 7000), - addr_new(ipv4, 7001) - }; - if(sender=='y') { - Polarity p = Polarity_Putter; - EndpointPolarity ep = EndpointPolarity_Active; - if(optimized=='y') { - // 1 port: (native)0-->(NETWORK) - connector_add_net_port(c, &ports[0], addrs[0], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } else { - // 4 ports: (native)0-->1(replicator)2-->(NETWORK) - // 3-->(NETWORK) - connector_add_port_pair(c, &ports[0], &ports[1]); - connector_add_net_port(c, &ports[2], addrs[0], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[3], addrs[1], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_component(c, ident, sizeof(ident)-1, ports+1, 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - } else { - Polarity p = Polarity_Getter; - EndpointPolarity ep = EndpointPolarity_Passive; - if(optimized=='y') { - // 5 ports: (NETWORK)-->0(replicator)1-->3(native) - // 2-->4 - connector_add_net_port(c, &ports[0], addrs[0], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_port_pair(c, &ports[1], &ports[3]); - connector_add_port_pair(c, &ports[2], &ports[4]); - connector_add_component(c, ident, sizeof(ident)-1, ports, 3); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } else { - // 2 ports: (NETWORK)-->3(native) - // -->4 - connector_add_net_port(c, &ports[3], addrs[0], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[4], addrs[1], p, ep); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - } - } - connector_connect(c, -1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - char * msg = malloc(msg_len); - memset(msg, 42, msg_len); - - clock_t begin = clock(); - for (i=0; i -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - Connector * c = connector_new(pd); - connector_print_debug(c); - - protocol_description_destroy(pd); - connector_print_debug(c); - connector_destroy(c); - return 0; -} \ No newline at end of file diff --git a/examples/incr_2/amy.c b/examples/incr_2/amy.c deleted file mode 100644 index 4bc181796aae7f3a3ac579bb8daac97b87264876..0000000000000000000000000000000000000000 --- a/examples/incr_2/amy.c +++ /dev/null @@ -1,21 +0,0 @@ -/* This example demonstrates: -- protocol descriptions are parsed from ascii text expressed in Reowolf's protocol language, PDL -- protocol descriptions load their PDL at runtime; component's definitions can be changed without recompiling the main program -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - Connector * c = connector_new(pd); - connector_print_debug(c); - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} \ No newline at end of file diff --git a/examples/incr_3/amy.c b/examples/incr_3/amy.c deleted file mode 100644 index 6402835e81f8df08fde75955767c911aa48a8046..0000000000000000000000000000000000000000 --- a/examples/incr_3/amy.c +++ /dev/null @@ -1,22 +0,0 @@ -/* This example demonstrates: -- Connectors begin in a "setup" state, which is existing with `connector_connect` -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - Connector * c = connector_new(pd); - connector_connect(c, 100); - - connector_print_debug(c); - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} \ No newline at end of file diff --git a/examples/incr_4/amy.c b/examples/incr_4/amy.c deleted file mode 100644 index bbb5fe9e0e822d416ffe1797365e37c0cca88d6a..0000000000000000000000000000000000000000 --- a/examples/incr_4/amy.c +++ /dev/null @@ -1,23 +0,0 @@ -/* This example demonstrates: -- Connectors in the setup state can add new channels by creating port pairs -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - Connector * c = connector_new(pd); - - PortId putter, getter; - connector_add_port_pair(c, &putter, &getter); - connector_print_debug(c); - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} \ No newline at end of file diff --git a/examples/incr_5/amy.c b/examples/incr_5/amy.c deleted file mode 100644 index ce05abf932c9363866ec72973e5ba66e9af2a50b..0000000000000000000000000000000000000000 --- a/examples/incr_5/amy.c +++ /dev/null @@ -1,39 +0,0 @@ -/* This example demonstrates: -- After connecting, ports can exchange messages -- Message exchange is conducted in two phases: - 1. preparing with `connector_put`, `connector_get`, and - 2. completing with `connector_sync`. - This paradigm is similar to that for sockets in non-blocking mode. -- The connector stores messages received during sync. they can be inspected using `connector_gotten_bytes`. -- Ports created using `connector_add_port_pair` behave as a synchronous channel; messages sent in one end are received at the other. -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - Connector * c = connector_new(pd); - - PortId putter, getter; - connector_add_port_pair(c, &putter, &getter); - connector_connect(c, -1); - connector_print_debug(c); - - connector_put_bytes(c, putter, "hello", 5); - connector_get(c, getter); - - connector_sync(c, -1); // -1 means infinite timeout duration - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len); - printf("Got msg `%.*s`\n", (int) msg_len, msg_ptr); - - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} diff --git a/examples/incr_6/amy.c b/examples/incr_6/amy.c deleted file mode 100644 index cdddc76f60d33fbab115f9a3484982a1b28071b1..0000000000000000000000000000000000000000 --- a/examples/incr_6/amy.c +++ /dev/null @@ -1,33 +0,0 @@ -/* This example demonstrates: -- Synchronized PUTs and GETs always succeed together. -- Attemping to put without a corresponding GET results in `connector_sync` failing at runtime. -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - char logpath[] = "./6_amy_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Err %s\n", reowolf_error_peek(NULL)); - - PortId putter, getter; - connector_add_port_pair(c, &putter, &getter); - connector_connect(c, -1); - connector_print_debug(c); - - printf("Let's try to put without get\n"); - connector_put_bytes(c, putter, "hello", 5); - // connector_get(c, getter); - int err = connector_sync(c, 5000); // 5000 means 5000millisec timeout duration - printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL)); - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} \ No newline at end of file diff --git a/examples/incr_7/amy.c b/examples/incr_7/amy.c deleted file mode 100644 index c2d4ed302e458849792268ef0abcf9d91dab710b..0000000000000000000000000000000000000000 --- a/examples/incr_7/amy.c +++ /dev/null @@ -1,42 +0,0 @@ -/* This example demonstrates: -- Synchronous rounds that fail result in RECOVERY; no messages are sent or received. -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - char logpath[] = "./7_amy_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Err %s\n", reowolf_error_peek(NULL)); - - PortId putter, getter; - connector_add_port_pair(c, &putter, &getter); - connector_connect(c, -1); - connector_print_debug(c); - - printf("Let's try to put without get\n"); - connector_put_bytes(c, putter, "hello", 5); - // connector_get(c, getter); - - int err = connector_sync(c, 5000); - printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL)); - - printf("\nLet's try again, doing both\n"); - connector_put_bytes(c, putter, "hello", 5); - connector_get(c, getter); - err = connector_sync(c, 5000); - printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL)); - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len); - printf("Got msg `%.*s`\n", (int) msg_len, msg_ptr); - - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl_ptr); - return 0; -} diff --git a/examples/incr_8/amy.c b/examples/incr_8/amy.c deleted file mode 100644 index 56faa210bf906fb264faed5a56a266965ffd6eee..0000000000000000000000000000000000000000 --- a/examples/incr_8/amy.c +++ /dev/null @@ -1,37 +0,0 @@ -/* This example demonstrates: -- Synchronous channels can span the network if created by pairs of `connector_add_net_port` calls, - each binding a port to the same address, with complementary polarities. -- Ports created this way have two notions of polarity: - - (port) Polarity in {Putter, Getter}: - - Putter => resulting port can `connector_put`, - - Getter => resulting port can `connector_get`, - - Endpoint Polarity in {Active, Passive}: - - Passive => underlying transport socket will `bind` to the given address, - - Active => underlying transport socket will `connect` to the given address, -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./8_amy_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId putter, getter; - FfiSocketAddr addr = {{127, 0, 0, 1}, 8000}; // ipv4 127.0.0.1 (localhost) transport port 8000 - connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_connect(c, 4000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - protocol_description_destroy(pd); - connector_destroy(c); - return 0; -} \ No newline at end of file diff --git a/examples/incr_9/amy.c b/examples/incr_9/amy.c deleted file mode 100644 index 1e92f51de8903fb7691d9710f434537fc9e87521..0000000000000000000000000000000000000000 --- a/examples/incr_9/amy.c +++ /dev/null @@ -1,38 +0,0 @@ -/* This example demonstrates: -- Once created, ports transport messages regardless of whether - they were created with `connector_add_port_pair` or `connector_add_net_port`. -*/ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./9_amy_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId putter, getter; - FfiSocketAddr addr = {{127, 0, 0, 1}, 8000}; - connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_connect(c, 4000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_put_bytes(c, putter, "hi", 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_get(c, getter); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_sync(c, 4000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - - protocol_description_destroy(pd); - connector_destroy(c); - return 0; -} \ No newline at end of file diff --git a/examples/interop_1_socket/main.c b/examples/interop_1_socket/main.c deleted file mode 100644 index eaa341e83dfeb3fd70ecb053dd46d5028c0652ac..0000000000000000000000000000000000000000 --- a/examples/interop_1_socket/main.c +++ /dev/null @@ -1,35 +0,0 @@ -/* This example demonstrates: -- conventional UDP socket API can be used in a connection-oriented fashion - - first setup with `bind` and `connect` - - henceforth communicating with connected peer using `send` and `recv` in blocking mode. -*/ -#include // definies socketaddr_in -#include // defines printf -#include // defines malloc, free -#include // defines close -#include // defines inet_addr -#define BUFSIZE 512 -int main() { - // --- setup --- - struct sockaddr_in addrs[2]; - addrs[0].sin_family = AF_INET; - addrs[0].sin_port = htons(8000); - inet_pton(AF_INET, "127.0.0.1", &addrs[0].sin_addr.s_addr); - addrs[1].sin_family = AF_INET; - addrs[1].sin_port = htons(8001); - inet_pton(AF_INET, "127.0.0.1", &addrs[1].sin_addr.s_addr); - int fd = socket(AF_INET, SOCK_DGRAM, 0); - bind(fd, (const struct sockaddr *)&addrs[0], sizeof(addrs[0])); - connect(fd, (const struct sockaddr *)&addrs[1], sizeof(addrs[1])); - // --- communication --- - char * buffer = malloc(BUFSIZE); - size_t msglen, i; - msglen = recv(fd, (void *)buffer, BUFSIZE, 0); - for(i=0; i // definies socketaddr_in -#include // defines printf -#include // defines malloc, free -#include // defines close -#include // defines inet_addr -#include "../../pseudo_socket.h" -#define BUFSIZE 512 -int main() { - // --- setup --- - struct sockaddr_in addrs[2]; - addrs[0].sin_family = AF_INET; - addrs[0].sin_port = htons(8000); - inet_pton(AF_INET, "127.0.0.1", &addrs[0].sin_addr.s_addr); - addrs[1].sin_family = AF_INET; - addrs[1].sin_port = htons(8001); - inet_pton(AF_INET, "127.0.0.1", &addrs[1].sin_addr.s_addr); - int fd = rw_socket(AF_INET, SOCK_DGRAM, 0); - rw_bind(fd, (const struct sockaddr *)&addrs[0], sizeof(addrs[0])); - rw_connect(fd, (const struct sockaddr *)&addrs[1], sizeof(addrs[1])); - // --- communication --- - char * buffer = malloc(BUFSIZE); - size_t msglen, i; - msglen = rw_recv(fd, (void *)buffer, BUFSIZE, 0); - for(i=0; i -#include -#include "../../reowolf.h" - -int main(int argc, char** argv) { - // --- setup --- - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - Connector * c = connector_new(pd); - PortId putter_a, putter_b; - FfiSocketAddr addresses[4] = { - {{127, 0, 0, 1}, 8000}, - {{127, 0, 0, 1}, 8001}, - {{127, 0, 0, 1}, 8002}, - {{127, 0, 0, 1}, 8003}, - }; - - // putter_a to UDP mediator (getter id discarded) - // with local addresses[0] and peer addresses[1] - connector_add_udp_mediator_component(c, &putter_a, NULL, addresses[1], addresses[0]); - connector_add_udp_mediator_component(c, &putter_b, NULL, addresses[3], addresses[2]); - connector_connect(c, -1); - - // --- communication --- - connector_put_bytes(c, putter_a, "hello, socket!", 14); - connector_put_bytes(c, putter_b, "hello, pseudo-socket!", 21); - connector_sync(c, -1); - - // --- cleanup --- - protocol_description_destroy(pd); - connector_destroy(c); - return 0; -} diff --git a/examples/make.py b/examples/make.py deleted file mode 100755 index db5e3c140b7f3e454680fe6ca835ee575b2528da..0000000000000000000000000000000000000000 --- a/examples/make.py +++ /dev/null @@ -1,20 +0,0 @@ -import os, glob, subprocess, time, sys -script_path = os.path.dirname(os.path.realpath(__file__)); -for c_file in glob.glob(script_path + "/*/*.c", recursive=False): - if sys.platform != "linux" and sys.platform != "linux2" and "interop" in c_file: - print("Not Linux! skipping", c_file) - continue - print("compiling", c_file) - args = [ - "gcc", # compiler - "-std=c11", # C11 mode - "-Wl,-R./", # pass -R flag to linker: produce relocatable object - c_file, # input source file - "-o", # output flag - c_file[:-2], # output filename - "-L", # lib path flag - "./", # where to look for libs - "-lreowolf_rs" # add lib called "reowolf_rs" - ]; - subprocess.run(args) -input("Blocking until newline..."); diff --git a/examples/pres_1/amy.c b/examples/pres_1/amy.c deleted file mode 100644 index c779ea7934b128e862a5a824c229d205a57ac9ab..0000000000000000000000000000000000000000 --- a/examples/pres_1/amy.c +++ /dev/null @@ -1,37 +0,0 @@ -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - char msgbuf[64]; - // ask user what message to send - size_t msglen = get_user_msg(msgbuf, sizeof(msgbuf)); - - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_1_amy.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 1 outgoing network connection - PortId p0; - FfiSocketAddr addr = {{127, 0, 0, 1}, 8000}; - connector_add_net_port(c, &p0, addr, Polarity_Putter, EndpointPolarity_Passive); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - // Prepare a message to send - connector_put_bytes(c, p0, msgbuf, msglen); - rw_err_peek(c); - - // ... reach new consistent state within 1000ms deadline. - connector_sync(c, 1000); - rw_err_peek(c); - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} \ No newline at end of file diff --git a/examples/pres_1/bob.c b/examples/pres_1/bob.c deleted file mode 100644 index c6b7ef3c365e4db687e727db336b9c0812ec258e..0000000000000000000000000000000000000000 --- a/examples/pres_1/bob.c +++ /dev/null @@ -1,38 +0,0 @@ -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_1_bob.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 1 outgoing network connection - PortId p0; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &p0, addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - // Prepare to receive a message. - connector_get(c, p0); - rw_err_peek(c); - - // ... reach new consistent state within 1000ms deadline. - connector_sync(c, 1000); - rw_err_peek(c); - - // Read our received message - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len); - printf("Got msg `%.*s`\n", (int) msg_len, msg_ptr); - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} diff --git a/examples/pres_2/bob.c b/examples/pres_2/bob.c deleted file mode 100644 index 5051d58602f30c9aca9a4e3ee0ccde8d064c511a..0000000000000000000000000000000000000000 --- a/examples/pres_2/bob.c +++ /dev/null @@ -1,42 +0,0 @@ -#include "../../reowolf.h" -#include "../utility.c" -int main(int argc, char** argv) { - // Create a connector, configured with a protocol defined in a file - char * pdl = buffer_pdl("./eg_protocols.pdl"); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl)); - char logpath[] = "./pres_2_bob.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 1 outgoing network connection - PortId ports[3]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); - connector_add_port_pair(c, &ports[1], &ports[2]); - connector_add_component(c, "pres_2", 6, ports, 2); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - // Prepare to receive a message. - connector_get(c, ports[2]); - rw_err_peek(c); - - // ... reach new consistent state within 1000ms deadline. - connector_sync(c, 1000); - rw_err_peek(c); - - // Read our received message - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, ports[2], &msg_len); - printf("Got msg `%.*s`\n", (int) msg_len, msg_ptr); - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl); - sleep(1.0); - return 0; -} diff --git a/examples/pres_3/amy.c b/examples/pres_3/amy.c deleted file mode 100644 index a8e7ecb3e80226cc37ad235af8db18649a0ef042..0000000000000000000000000000000000000000 --- a/examples/pres_3/amy.c +++ /dev/null @@ -1,47 +0,0 @@ - -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_3_amy.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 2 outgoing network connections - PortId ports[2]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Putter, EndpointPolarity_Passive); - rw_err_peek(c); - addr.port = 8001; - connector_add_net_port(c, &ports[1], addr, Polarity_Putter, EndpointPolarity_Passive); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - printf("\nputting {A}...\n"); - connector_put_bytes(c, ports[0], "A", 1); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("\nputting {B}...\n"); - connector_put_bytes(c, ports[1], "B", 1); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("\nputting {A, B}...\n"); - connector_put_bytes(c, ports[0], "A", 1); - connector_put_bytes(c, ports[1], "B", 1); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("\nExiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} \ No newline at end of file diff --git a/examples/pres_3/bob.c b/examples/pres_3/bob.c deleted file mode 100644 index 78306e23095285c7338530b1fd26379c9f21182c..0000000000000000000000000000000000000000 --- a/examples/pres_3/bob.c +++ /dev/null @@ -1,41 +0,0 @@ - -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_3_bob.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 2 outgoing network connections - PortId ports[2]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - addr.port = 8001; - connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - for(int i=0; i<3; i++) { - printf("\nGetting from both...\n"); - connector_get(c, ports[0]); - rw_err_peek(c); - connector_get(c, ports[1]); - rw_err_peek(c); - connector_sync(c, 1000); - rw_err_peek(c); - } - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} \ No newline at end of file diff --git a/examples/pres_4/bob.c b/examples/pres_4/bob.c deleted file mode 100644 index b4a68f6aa105657d190919587f15850e8e536fcd..0000000000000000000000000000000000000000 --- a/examples/pres_4/bob.c +++ /dev/null @@ -1,51 +0,0 @@ - -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_3_bob.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 2 outgoing network connections - PortId ports[2]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - addr.port = 8001; - connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - for(int i=0; i<3; i++) { - printf("\nNext round...\n"); - printf("\nOption 0: Get {A}\n"); - connector_get(c, ports[0]); - connector_next_batch(c); - rw_err_peek(c); - - printf("\nOption 1: Get {B}\n"); - connector_get(c, ports[1]); - connector_next_batch(c); - rw_err_peek(c); - - printf("\nOption 2: Get {A, B}\n"); - connector_get(c, ports[0]); - connector_get(c, ports[1]); - int code = connector_sync(c, 1000); - printf("Outcome: %d\n", code); - rw_err_peek(c); - } - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} \ No newline at end of file diff --git a/examples/pres_5/amy.c b/examples/pres_5/amy.c deleted file mode 100644 index 9267f73c15ea446c7f8601c011bd86cfa88aa268..0000000000000000000000000000000000000000 --- a/examples/pres_5/amy.c +++ /dev/null @@ -1,47 +0,0 @@ - -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // Create a connector, configured with our (trivial) protocol. - Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./pres_3_amy.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 2 outgoing network connections - PortId ports[2]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Putter, EndpointPolarity_Passive); - rw_err_peek(c); - addr.port = 8001; - connector_add_net_port(c, &ports[1], addr, Polarity_Putter, EndpointPolarity_Passive); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - printf("Round 0. Putting {ports[0]=\"r0p0\", ports[1]=\"r0p1\"}\n"); - connector_put_bytes(c, ports[0], "r0p0", 4); - connector_put_bytes(c, ports[1], "r0p1", 4); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("Round 1. Putting {ports[1]=\"r1p1\"}\n"); - connector_put_bytes(c, ports[1], "r1p1", 4); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("Round 2. Putting {ports[0]=\"r2p0\"}\n"); - connector_put_bytes(c, ports[0], "r2p0", 4); - connector_sync(c, 1000); - rw_err_peek(c); - - printf("\nExiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - sleep(1.0); - return 0; -} \ No newline at end of file diff --git a/examples/pres_5/bob.c b/examples/pres_5/bob.c deleted file mode 100644 index d652c1884f39845c397ef8e490652f5d921c57ab..0000000000000000000000000000000000000000 --- a/examples/pres_5/bob.c +++ /dev/null @@ -1,47 +0,0 @@ - -#include "../../reowolf.h" -#include "../utility.c" - - -int main(int argc, char** argv) { - // Create a connector, configured with a protocol defined in a file - char * pdl = buffer_pdl("./eg_protocols.pdl"); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl)); - char logpath[] = "./pres_3_bob.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - rw_err_peek(c); - - // ... with 2 outgoing network connections - PortId ports[4]; - FfiSocketAddr addr = {{127,0,0,1}, 8000}; - connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); - rw_err_peek(c); - addr.port = 8001; - connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); - connector_add_port_pair(c, &ports[2], &ports[3]); - connector_add_component(c, "alt_round_merger", 16, ports, 3); - rw_err_peek(c); - - // Connect with peers (5000ms timeout). - connector_connect(c, 5000); - rw_err_peek(c); - - for(int round=0; round<3; round++) { - printf("----------Round %d\n", round); - connector_get(c, ports[3]); - rw_err_peek(c); - connector_sync(c, 1000); - rw_err_peek(c); - - size_t msg_len = 0; - const char * msg_ptr = connector_gotten_bytes(c, ports[3], &msg_len); - printf("Got msg `%.*s`\n", (int) msg_len, msg_ptr); - } - - printf("Exiting\n"); - protocol_description_destroy(pd); - connector_destroy(c); - free(pdl); - sleep(1.0); - return 0; -} diff --git a/examples/utility.c b/examples/utility.c deleted file mode 100644 index c089450f4321dca20dea003fa7e879e9c7354e37..0000000000000000000000000000000000000000 --- a/examples/utility.c +++ /dev/null @@ -1,36 +0,0 @@ -#include -#include -#include -#include -#include -#include "../reowolf.h" - -size_t get_user_msg(char * buf, size_t cap) { - memset(buf, 0, cap); - printf("Insert a msg of max len %zu: ", cap); - fgets(buf, cap, stdin); - for(size_t len = 0; len for FfiSocketAddr { - fn into(self) -> SocketAddr { - (self.ipv4, self.port).into() - } -} - -/////////////////////////////////////////////// -#[derive(Default)] -struct StoredError { - // invariant: len is zero IFF its occupied - // contents are 1+ bytes because we also store the NULL TERMINATOR - buf: Vec, -} -impl StoredError { - const NULL_TERMINATOR: u8 = 0; - fn clear(&mut self) { - // no null terminator either! - self.buf.clear(); - } - fn debug_store(&mut self, error: &E) { - let _ = write!(&mut self.buf, "{:?}", error); - self.buf.push(Self::NULL_TERMINATOR); - } - fn tl_debug_store(error: &E) { - STORED_ERROR.with(|stored_error| { - let mut stored_error = stored_error.borrow_mut(); - stored_error.clear(); - stored_error.debug_store(error); - }) - } - fn bytes_store(&mut self, bytes: &[u8]) { - let _ = self.buf.write_all(bytes); - self.buf.push(Self::NULL_TERMINATOR); - } - fn tl_bytes_store(bytes: &[u8]) { - STORED_ERROR.with(|stored_error| { - let mut stored_error = stored_error.borrow_mut(); - stored_error.clear(); - stored_error.bytes_store(bytes); - }) - } - fn tl_clear() { - STORED_ERROR.with(|stored_error| { - let mut stored_error = stored_error.borrow_mut(); - stored_error.clear(); - }) - } - fn tl_bytes_peek() -> (*const u8, usize) { - STORED_ERROR.with(|stored_error| { - let stored_error = stored_error.borrow(); - match stored_error.buf.len() { - 0 => (core::ptr::null(), 0), // no error! - n => { - // stores an error of length n-1 AND a NULL TERMINATOR - (stored_error.buf.as_ptr(), n - 1) - } - } - }) - } -} -thread_local! { - static STORED_ERROR: RefCell = RefCell::new(StoredError::default()); -} - -pub const RW_OK: c_int = 0; -pub const RW_TL_ERR: c_int = -1; -pub const RW_WRONG_STATE: c_int = -2; -pub const RW_LOCK_POISONED: c_int = -3; -pub const RW_CLOSE_FAIL: c_int = -4; -pub const RW_BAD_FD: c_int = -5; -pub const RW_CONNECT_FAILED: c_int = -6; -pub const RW_WOULD_BLOCK: c_int = -7; -pub const RW_BAD_SOCKADDR: c_int = -8; - -///////////////////// REOWOLF ////////////////////////// - -/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error. -/// - pointer is NULL iff there was no last error -/// - data at pointer is null-delimited -/// - len does NOT include the length of the null-delimiter -/// If len is NULL, it will not written to. -#[no_mangle] -pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 { - let (err_ptr, err_len) = StoredError::tl_bytes_peek(); - if !len.is_null() { - len.write(err_len); - } - err_ptr -} - -///////////////////// PROTOCOL DESCRIPTION ////////////////////////// - -/// Parses the utf8-encoded string slice to initialize a new protocol description object. -/// - On success, initializes `out` and returns 0 -/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1 -#[no_mangle] -pub unsafe extern "C" fn protocol_description_parse( - pdl: *const u8, - pdl_len: usize, -) -> *mut Arc { - StoredError::tl_clear(); - match ProtocolDescription::parse(&*slice_from_raw_parts(pdl, pdl_len)) { - Ok(new) => Box::into_raw(Box::new(Arc::new(new))), - Err(err) => { - StoredError::tl_bytes_store(err.as_bytes()); - std::ptr::null_mut() - } - } -} - -/// Destroys the given initialized protocol description and frees its resources. -#[no_mangle] -pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc) { - drop(Box::from_raw(pd)) -} - -/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed. -#[no_mangle] -pub unsafe extern "C" fn protocol_description_clone( - pd: &Arc, -) -> *mut Arc { - Box::into_raw(Box::new(pd.clone())) -} - -///////////////////// CONNECTOR ////////////////////////// - -#[no_mangle] -pub unsafe extern "C" fn connector_new_logging_with_id( - pd: &Arc, - path_ptr: *const u8, - path_len: usize, - connector_id: ConnectorId, -) -> *mut Connector { - StoredError::tl_clear(); - let path_bytes = &*slice_from_raw_parts(path_ptr, path_len); - let path_str = match std::str::from_utf8(path_bytes) { - Ok(path_str) => path_str, - Err(err) => { - StoredError::tl_debug_store(&err); - return std::ptr::null_mut(); - } - }; - match std::fs::File::create(path_str) { - Ok(file) => { - let file_logger = Box::new(FileLogger::new(connector_id, file)); - let c = Connector::new(file_logger, pd.clone(), connector_id); - Box::into_raw(Box::new(c)) - } - Err(err) => { - StoredError::tl_debug_store(&err); - std::ptr::null_mut() - } - } -} -#[no_mangle] -pub unsafe extern "C" fn connector_new_with_id( - pd: &Arc, - connector_id: ConnectorId, -) -> *mut Connector { - let c = Connector::new(Box::new(DummyLogger), pd.clone(), connector_id); - Box::into_raw(Box::new(c)) -} -#[no_mangle] -pub unsafe extern "C" fn connector_new_logging( - pd: &Arc, - path_ptr: *const u8, - path_len: usize, -) -> *mut Connector { - connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id()) -} - -/// Initializes `out` with a new connector using the given protocol description as its configuration. -/// The connector uses the given (internal) connector ID. -#[no_mangle] -pub unsafe extern "C" fn connector_new(pd: &Arc) -> *mut Connector { - connector_new_with_id(pd, Connector::random_id()) -} - -/// Destroys the given a pointer to the connector on the heap, freeing its resources. -/// Usable in {setup, communication} states. -#[no_mangle] -pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) { - drop(Box::from_raw(connector)) -} - -#[no_mangle] -pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) { - println!("Debug print dump {:#?}", connector); -} - -/// Given an initialized connector in setup or connecting state, -/// - Creates a new directed port pair with logical channel putter->getter, -/// - adds the ports to the native component's interface, -/// - and returns them using the given out pointers. -/// Usable in {setup, communication} states. -#[no_mangle] -pub unsafe extern "C" fn connector_add_port_pair( - connector: &mut Connector, - out_putter: *mut PortId, - out_getter: *mut PortId, -) { - let [o, i] = connector.new_port_pair(); - if !out_putter.is_null() { - out_putter.write(o); - } - if !out_getter.is_null() { - out_getter.write(i); - } -} - -/// Given -/// - an initialized connector in setup or connecting state, -/// - a string slice for the component's identifier in the connector's configured protocol description, -/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface, -/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C. -/// Usable in {setup, communication} states. -#[no_mangle] -pub unsafe extern "C" fn connector_add_component( - connector: &mut Connector, - module_ptr: *const u8, - module_len: usize, - ident_ptr: *const u8, - ident_len: usize, - ports_ptr: *const PortId, - ports_len: usize, -) -> c_int { - StoredError::tl_clear(); - match connector.add_component( - &*slice_from_raw_parts(module_ptr, module_len), - &*slice_from_raw_parts(ident_ptr, ident_len), - &*slice_from_raw_parts(ports_ptr, ports_len), - ) { - Ok(()) => RW_OK, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -/// Given -/// - an initialized connector in setup or connecting state, -/// - a utf-8 encoded socket address, -/// - the logical polarity of P, -/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered, -/// returns P, a port newly added to the native interface. -#[no_mangle] -pub unsafe extern "C" fn connector_add_net_port( - connector: &mut Connector, - port: *mut PortId, - addr: FfiSocketAddr, - port_polarity: Polarity, - endpoint_polarity: EndpointPolarity, -) -> c_int { - StoredError::tl_clear(); - match connector.new_net_port(port_polarity, addr.into(), endpoint_polarity) { - Ok(p) => { - if !port.is_null() { - port.write(p); - } - RW_OK - } - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -/// Given -/// - an initialized connector in setup or connecting state, -/// - a utf-8 encoded BIND socket addresses (i.e., "local"), -/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"), -/// returns [P, G] via out pointers [putter, getter], -/// - where P is a Putter port that sends messages into the socket -/// - where G is a Getter port that recvs messages from the socket -#[no_mangle] -pub unsafe extern "C" fn connector_add_udp_mediator_component( - connector: &mut Connector, - putter: *mut PortId, - getter: *mut PortId, - local_addr: FfiSocketAddr, - peer_addr: FfiSocketAddr, -) -> c_int { - StoredError::tl_clear(); - match connector.new_udp_mediator_component(local_addr.into(), peer_addr.into()) { - Ok([p, g]) => { - if !putter.is_null() { - putter.write(p); - } - if !getter.is_null() { - getter.write(g); - } - RW_OK - } - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -/// Connects this connector to the distributed system of connectors reachable through endpoints, -/// Usable in setup state, and changes the state to communication. -#[no_mangle] -pub unsafe extern "C" fn connector_connect( - connector: &mut Connector, - timeout_millis: i64, -) -> c_int { - StoredError::tl_clear(); - let option_timeout_millis: Option = TryFrom::try_from(timeout_millis).ok(); - let timeout = option_timeout_millis.map(Duration::from_millis); - match connector.connect(timeout) { - Ok(()) => RW_OK, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -// #[no_mangle] -// pub unsafe extern "C" fn connector_put_payload( -// connector: &mut Connector, -// port: PortId, -// payload: *mut Payload, -// ) -> c_int { -// match connector.put(port, payload.read()) { -// Ok(()) => 0, -// Err(err) => { -// StoredError::tl_debug_store(&err); -// -1 -// } -// } -// } - -// #[no_mangle] -// pub unsafe extern "C" fn connector_put_payload_cloning( -// connector: &mut Connector, -// port: PortId, -// payload: &Payload, -// ) -> c_int { -// match connector.put(port, payload.clone()) { -// Ok(()) => 0, -// Err(err) => { -// StoredError::tl_debug_store(&err); -// -1 -// } -// } -// } - -/// Convenience function combining the functionalities of -/// "payload_new" with "connector_put_payload". -#[no_mangle] -pub unsafe extern "C" fn connector_put_bytes( - connector: &mut Connector, - port: PortId, - bytes_ptr: *const u8, - bytes_len: usize, -) -> c_int { - StoredError::tl_clear(); - let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); - match connector.put(port, Payload::from(bytes)) { - Ok(()) => RW_OK, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> c_int { - StoredError::tl_clear(); - match connector.get(port) { - Ok(()) => RW_OK, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize { - StoredError::tl_clear(); - match connector.next_batch() { - Ok(n) => n as isize, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR as isize - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize { - StoredError::tl_clear(); - let option_timeout_millis: Option = TryFrom::try_from(timeout_millis).ok(); - let timeout = option_timeout_millis.map(Duration::from_millis); - match connector.sync(timeout) { - Ok(n) => n as isize, - Err(err) => { - StoredError::tl_debug_store(&err); - RW_TL_ERR as isize - } - } -} - -#[no_mangle] -pub unsafe extern "C" fn connector_gotten_bytes( - connector: &mut Connector, - port: PortId, - out_len: *mut usize, -) -> *const u8 { - StoredError::tl_clear(); - match connector.gotten(port) { - Ok(payload_borrow) => { - let slice = payload_borrow.as_slice(); - if !out_len.is_null() { - out_len.write(slice.len()); - } - slice.as_ptr() - } - Err(err) => { - StoredError::tl_debug_store(&err); - std::ptr::null() - } - } -} - -// #[no_mangle] -// unsafe extern "C" fn connector_gotten_payload( -// connector: &mut Connector, -// port: PortId, -// ) -> *const Payload { -// StoredError::tl_clear(); -// match connector.gotten(port) { -// Ok(payload_borrow) => payload_borrow, -// Err(err) => { -// StoredError::tl_debug_store(&err); -// std::ptr::null() -// } -// } -// } - -///////////////////// PAYLOAD ////////////////////////// -// #[no_mangle] -// unsafe extern "C" fn payload_new( -// bytes_ptr: *const u8, -// bytes_len: usize, -// out_payload: *mut Payload, -// ) { -// let bytes: &[u8] = &*slice_from_raw_parts(bytes_ptr, bytes_len); -// out_payload.write(Payload::from(bytes)); -// } - -// #[no_mangle] -// unsafe extern "C" fn payload_destroy(payload: *mut Payload) { -// drop(Box::from_raw(payload)) -// } - -// #[no_mangle] -// unsafe extern "C" fn payload_clone(payload: &Payload, out_payload: *mut Payload) { -// out_payload.write(payload.clone()) -// } - -// #[no_mangle] -// unsafe extern "C" fn payload_peek_bytes(payload: &Payload, bytes_len: *mut usize) -> *const u8 { -// let slice = payload.as_slice(); -// bytes_len.write(slice.len()); -// slice.as_ptr() -// } diff --git a/src/ffi/pseudo_socket_api.rs b/src/ffi/pseudo_socket_api.rs deleted file mode 100644 index 6ec03a89ff0c7cfa1f9e082bf1f37f34e4d8d2b2..0000000000000000000000000000000000000000 --- a/src/ffi/pseudo_socket_api.rs +++ /dev/null @@ -1,205 +0,0 @@ -use super::*; - -use core::ops::DerefMut; -use libc::{sockaddr, socklen_t}; -use std::{ - collections::HashMap, - ffi::c_void, - net::SocketAddr, - os::raw::c_int, - sync::{Mutex, RwLock}, -}; -/////////////////////////////////////////////////////////////////// - -struct FdAllocator { - next: Option, - freed: Vec, -} -enum ConnectorComplexPhased { - Setup { local: Option, peer: Option }, - Communication { putter: PortId, getter: PortId }, -} -struct ConnectorComplex { - // invariant: .connector.phased and .phased are variants Setup/Communication in lockstep. - connector: Connector, - phased: ConnectorComplexPhased, -} -#[derive(Default)] -struct CcMap { - fd_to_cc: HashMap>, - fd_allocator: FdAllocator, -} -/////////////////////////////////////////////////////////////////// -unsafe fn payload_from_raw(bytes_ptr: *const c_void, bytes_len: usize) -> Payload { - let bytes_ptr = std::mem::transmute(bytes_ptr); - let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); - Payload::from(bytes) -} -unsafe fn libc_to_std_sockaddr(addr: *const sockaddr, addr_len: socklen_t) -> Option { - os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr() -} -impl Default for FdAllocator { - fn default() -> Self { - // negative FDs aren't used s.t. they are available for error signalling - Self { next: Some(0), freed: vec![] } - } -} -impl FdAllocator { - fn alloc(&mut self) -> c_int { - if let Some(fd) = self.freed.pop() { - return fd; - } - if let Some(fd) = self.next { - self.next = fd.checked_add(1); - return fd; - } - panic!("No more Connector FDs to allocate!") - } - fn free(&mut self, fd: c_int) { - self.freed.push(fd); - } -} -lazy_static::lazy_static! { - static ref CC_MAP: RwLock = Default::default(); - static ref TRIVIAL_PD: Arc = { - Arc::new(ProtocolDescription::parse(b"").unwrap()) - }; -} -impl ConnectorComplex { - fn try_become_connected(&mut self) { - match self.phased { - ConnectorComplexPhased::Setup { local: Some(local), peer: Some(peer) } => { - // complete setup - let [putter, getter] = - self.connector.new_udp_mediator_component(local, peer).unwrap(); - self.connector.connect(None).unwrap(); - self.phased = ConnectorComplexPhased::Communication { putter, getter } - } - _ => {} // setup incomplete - } - } -} -///////////////////////////////// -#[no_mangle] -pub extern "C" fn rw_socket(_domain: c_int, _type: c_int, _protocol: c_int) -> c_int { - // get writer lock - let mut w = if let Ok(w) = CC_MAP.write() { w } else { return RW_LOCK_POISONED }; - - let fd = w.fd_allocator.alloc(); - let cc = ConnectorComplex { - connector: Connector::new( - Box::new(crate::DummyLogger), - TRIVIAL_PD.clone(), - Connector::random_id(), - ), - phased: ConnectorComplexPhased::Setup { local: None, peer: None }, - }; - w.fd_to_cc.insert(fd, Mutex::new(cc)); - fd -} -#[no_mangle] -pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { - // ignoring HOW - // get writer lock - let mut w = if let Ok(w) = CC_MAP.write() { w } else { return RW_LOCK_POISONED }; - if w.fd_to_cc.remove(&fd).is_some() { - w.fd_allocator.free(fd); - RW_OK - } else { - RW_CLOSE_FAIL - } -} -#[no_mangle] -pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int { - // assuming _domain is AF_INET and _type is SOCK_DGRAM - let addr = match libc_to_std_sockaddr(addr, addr_len) { - Some(addr) => addr, - _ => return RW_BAD_SOCKADDR, - }; - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return RW_LOCK_POISONED }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return RW_BAD_FD }; - let mut cc = if let Ok(cc) = cc.lock() { cc } else { return RW_LOCK_POISONED }; - match &mut cc.phased { - ConnectorComplexPhased::Communication { .. } => RW_WRONG_STATE, - ConnectorComplexPhased::Setup { local, .. } => { - *local = Some(addr); - cc.try_become_connected(); - RW_OK - } - } -} -#[no_mangle] -pub unsafe extern "C" fn rw_connect( - fd: c_int, - addr: *const sockaddr, - addr_len: socklen_t, -) -> c_int { - let addr = match libc_to_std_sockaddr(addr, addr_len) { - Some(addr) => addr, - _ => return RW_BAD_SOCKADDR, - }; - // assuming _domain is AF_INET and _type is SOCK_DGRAM - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return RW_LOCK_POISONED }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return RW_BAD_FD }; - let mut cc = if let Ok(cc) = cc.lock() { cc } else { return RW_LOCK_POISONED }; - match &mut cc.phased { - ConnectorComplexPhased::Communication { .. } => RW_WRONG_STATE, - ConnectorComplexPhased::Setup { peer, .. } => { - *peer = Some(addr); - cc.try_become_connected(); - RW_OK - } - } -} -#[no_mangle] -pub unsafe extern "C" fn rw_send( - fd: c_int, - bytes_ptr: *const c_void, - bytes_len: usize, - _flags: c_int, -) -> isize { - // ignoring flags - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return RW_LOCK_POISONED as isize }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return RW_BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.lock() { cc } else { return RW_LOCK_POISONED as isize }; - let ConnectorComplex { connector, phased } = cc.deref_mut(); - match phased { - ConnectorComplexPhased::Setup { .. } => RW_WRONG_STATE as isize, - ConnectorComplexPhased::Communication { putter, .. } => { - let payload = payload_from_raw(bytes_ptr, bytes_len); - connector.put(*putter, payload).unwrap(); - connector.sync(None).unwrap(); - bytes_len as isize - } - } -} -#[no_mangle] -pub unsafe extern "C" fn rw_recv( - fd: c_int, - bytes_ptr: *mut c_void, - bytes_len: usize, - _flags: c_int, -) -> isize { - // ignoring flags - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return RW_LOCK_POISONED as isize }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return RW_BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.lock() { cc } else { return RW_LOCK_POISONED as isize }; - let ConnectorComplex { connector, phased } = cc.deref_mut(); - match phased { - ConnectorComplexPhased::Setup { .. } => RW_WRONG_STATE as isize, - ConnectorComplexPhased::Communication { getter, .. } => { - connector.get(*getter).unwrap(); - connector.sync(None).unwrap(); - let slice = connector.gotten(*getter).unwrap().as_slice(); - if !bytes_ptr.is_null() { - let cpy_msg_bytes = slice.len().min(bytes_len); - std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); - } - slice.len() as isize - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 891db6c02e70a9780f297b70f089fa6438177a3d..357f234aa26047196d17a6c8e5f2a3727e53f6b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,9 @@ #[macro_use] mod macros; -mod common; +// mod common; mod protocol; -mod runtime; -pub mod runtime2; +pub mod runtime; mod collections; -pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId}; -pub use protocol::ProtocolDescription; -pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger}; - -// TODO: Remove when not benchmarking -pub use protocol::input_source::InputSource; -pub use protocol::ast::Heap; - -#[cfg(feature = "ffi")] -pub mod ffi; +pub use protocol::ProtocolDescription; \ No newline at end of file diff --git a/src/macros.rs b/src/macros.rs index 61f92de410ed10e4dbb8b993b023a83732777a6f..2d27e951b6bcd4f6cace5a6df3824f08d3aa3957 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -7,22 +7,4 @@ macro_rules! enabled_debug_print { (true, $name:literal, $format:literal, $($args:expr),*) => { println!("[{}] {}", $name, format!($format, $($args),*)) }; -} - -/* -Change the definition of these macros to control the logging level statically -*/ - -macro_rules! log { - (@ENDPT, $logger:expr, $($arg:tt)*) => {{ - // if let Some(w) = $logger.line_writer() { - // let _ = writeln!(w, $($arg)*); - // } - }}; - ($logger:expr, $($arg:tt)*) => {{ - #[cfg(not(feature = "no_logging"))] - if let Some(w) = $logger.line_writer() { - let _ = writeln!(w, $($arg)*); - } - }}; -} +} \ No newline at end of file diff --git a/src/protocol/arena.rs b/src/protocol/arena.rs index 6afdc88e49dc4a56b392a9dc204d7a664cb8713d..ad304d8fdc5201e607e27ce500ae1e384966d67f 100644 --- a/src/protocol/arena.rs +++ b/src/protocol/arena.rs @@ -1,4 +1,5 @@ -use crate::common::*; +use std::fmt::{Debug, Formatter}; + use core::hash::Hash; use core::marker::PhantomData; diff --git a/src/protocol/eval/mod.rs b/src/protocol/eval/mod.rs index 33f8e6bb9897331a06b763b82df5da99ce7c19c0..a1bc818d7e5d762b872506922c8f7e2bdcdf3273 100644 --- a/src/protocol/eval/mod.rs +++ b/src/protocol/eval/mod.rs @@ -26,7 +26,6 @@ pub(crate) mod executor; pub(crate) mod error; pub use error::EvalError; -pub use value::{Value, ValueGroup}; -pub(crate) use store::{Store}; +pub use value::{PortId, Value, ValueGroup}; pub use executor::{EvalContinuation, Prompt}; diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index a6fe4c77fe73edefead5449a99c6b78ca8bfc5f7..d08d327fecbb3faaaa79ae2c9b7629ebb8ba8a4e 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -1,7 +1,6 @@ use std::collections::VecDeque; use super::store::*; -use crate::PortId; use crate::protocol::ast::{ AssignmentOperator, BinaryOperator, @@ -20,6 +19,17 @@ pub enum ValueId { Heap(HeapPos, u32), // allocated region + values within that region } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PortId{ + pub(crate) id: u32 +} + +impl PortId { + pub fn new(id: u32) -> Self { + return Self{ id }; + } +} + /// Represents a value stored on the stack or on the heap. Some values contain /// a `HeapPos`, implying that they're stored in the store's `Heap`. Clearing /// a `Value` with a `HeapPos` from a stack must also clear the associated diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 1f2b33ede745d27fc1336470d05211b5532a8ccf..f9d1295140a2ace7cbcc910ac216758e5510aed5 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -10,7 +10,6 @@ pub(crate) mod ast_printer; use std::sync::Mutex; use crate::collections::{StringPool, StringRef}; -use crate::common::*; use crate::protocol::ast::*; use crate::protocol::eval::*; use crate::protocol::input_source::*; @@ -36,14 +35,6 @@ pub(crate) struct ComponentState { pub(crate) prompt: Prompt, } -#[allow(dead_code)] -pub(crate) enum EvalContext<'a> { - Nonsync(&'a mut NonsyncProtoContext<'a>), - Sync(&'a mut SyncProtoContext<'a>), - None, -} -////////////////////////////////////////////// - #[derive(Debug)] pub enum ComponentCreationError { ModuleDoesntExist, @@ -88,80 +79,7 @@ impl ProtocolDescription { }); } - #[deprecated] - pub(crate) fn component_polarities( - &self, - module_name: &[u8], - identifier: &[u8], - ) -> Result, AddComponentError> { - use AddComponentError::*; - - let module_root = self.lookup_module_root(module_name); - if module_root.is_none() { - return Err(AddComponentError::NoSuchModule); - } - let module_root = module_root.unwrap(); - - let root = &self.heap[module_root]; - let def = root.get_definition_ident(&self.heap, identifier); - if def.is_none() { - return Err(NoSuchComponent); - } - - let def = &self.heap[def.unwrap()]; - if !def.is_component() { - return Err(NoSuchComponent); - } - - for ¶m in def.parameters().iter() { - let param = &self.heap[param]; - let first_element = ¶m.parser_type.elements[0]; - - match first_element.variant { - ParserTypeVariant::Input | ParserTypeVariant::Output => continue, - _ => { - return Err(NonPortTypeParameters); - } - } - } - - let mut result = Vec::new(); - for ¶m in def.parameters().iter() { - let param = &self.heap[param]; - let first_element = ¶m.parser_type.elements[0]; - - if first_element.variant == ParserTypeVariant::Input { - result.push(Polarity::Getter) - } else if first_element.variant == ParserTypeVariant::Output { - result.push(Polarity::Putter) - } else { - unreachable!() - } - } - Ok(result) - } - - // expects port polarities to be correct - #[deprecated] - pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState { - let mut args = Vec::new(); - for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) { - match y { - Polarity::Getter => args.push(Value::Input(x)), - Polarity::Putter => args.push(Value::Output(x)), - } - } - - let module_root = self.lookup_module_root(module_name).unwrap(); - let root = &self.heap[module_root]; - let def = root.get_definition_ident(&self.heap, identifier).unwrap(); - - ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) } - } - - // TODO: Ofcourse, rename this at some point, perhaps even remove it in its - // entirety. Find some way to interface with the parameter's types. - pub(crate) fn new_component_v2( + pub(crate) fn new_component( &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup ) -> Result { // Find the module in which the definition can be found @@ -295,319 +213,3 @@ pub trait RunContext { fn performed_fork(&mut self) -> Option; // None if not yet forked fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared } - -#[derive(Debug)] -pub enum RunResult { - // Can only occur outside sync blocks - ComponentTerminated, // component has exited its procedure - ComponentAtSyncStart, - NewComponent(DefinitionId, i32, ValueGroup), // should also be possible inside sync - NewChannel, // should also be possible inside sync - // Can only occur inside sync blocks - BranchInconsistent, // branch has inconsistent behaviour - BranchMissingPortState(PortId), // branch doesn't know about port firing - BranchGet(PortId), // branch hasn't received message on input port yet - BranchAtSyncEnd, - BranchFork, - BranchPut(PortId, ValueGroup), -} - -impl ComponentState { - pub(crate) fn run(&mut self, ctx: &mut impl RunContext, pd: &ProtocolDescription) -> RunResult { - use EvalContinuation as EC; - use RunResult as RR; - - loop { - let step_result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); - match step_result { - Err(reason) => { - println!("Evaluation error:\n{}", reason); - todo!("proper error handling/bubbling up"); - }, - Ok(continuation) => match continuation { - EC::Stepping => continue, - EC::BranchInconsistent => return RR::BranchInconsistent, - EC::ComponentTerminated => return RR::ComponentTerminated, - EC::SyncBlockStart => return RR::ComponentAtSyncStart, - EC::SyncBlockEnd => return RR::BranchAtSyncEnd, - EC::NewComponent(definition_id, monomorph_idx, args) => - return RR::NewComponent(definition_id, monomorph_idx, args), - EC::NewChannel => - return RR::NewChannel, - EC::NewFork => - return RR::BranchFork, - EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id), - EC::BlockGet(port_id) => return RR::BranchGet(port_id), - EC::Put(port_id, value_group) => { - return RR::BranchPut(port_id, value_group); - }, - } - } - } - } -} - -// TODO: @remove the old stuff -impl ComponentState { - pub(crate) fn nonsync_run<'a: 'b, 'b>( - &'a mut self, - context: &'b mut NonsyncProtoContext<'b>, - pd: &'a ProtocolDescription, - ) -> NonsyncBlocker { - let mut context = EvalContext::Nonsync(context); - loop { - let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context); - match result { - Err(err) => { - println!("Evaluation error:\n{}", err); - panic!("proper error handling when component fails"); - }, - Ok(cont) => match cont { - EvalContinuation::Stepping => continue, - EvalContinuation::BranchInconsistent => return NonsyncBlocker::Inconsistent, - EvalContinuation::ComponentTerminated => return NonsyncBlocker::ComponentExit, - EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart, - // Not possible to end sync block if never entered one - EvalContinuation::SyncBlockEnd => unreachable!(), - EvalContinuation::NewComponent(definition_id, monomorph_idx, args) => { - // Look up definition - let mut moved_ports = HashSet::new(); - for arg in args.values.iter() { - match arg { - Value::Output(port) => { - moved_ports.insert(*port); - } - Value::Input(port) => { - moved_ports.insert(*port); - } - _ => {} - } - } - for region in args.regions.iter() { - for arg in region { - match arg { - Value::Output(port) => { moved_ports.insert(*port); }, - Value::Input(port) => { moved_ports.insert(*port); }, - _ => {}, - } - } - } - let init_state = ComponentState { prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, args) }; - context.new_component(moved_ports, init_state); - // Continue stepping - continue; - }, - EvalContinuation::NewChannel => { - // Because of the way we emulate the old context for now, we can safely - // assume that this will never happen. The old context thingamajig always - // creates a channel, it never bubbles a "need to create a channel" message - // to the runtime - unreachable!(); - }, - EvalContinuation::NewFork => unreachable!(), - // Outside synchronous blocks, no fires/get/put happens - EvalContinuation::BlockFires(_) => unreachable!(), - EvalContinuation::BlockGet(_) => unreachable!(), - EvalContinuation::Put(_, _) => unreachable!(), - }, - } - } - } - - pub(crate) fn sync_run<'a: 'b, 'b>( - &'a mut self, - context: &'b mut SyncProtoContext<'b>, - pd: &'a ProtocolDescription, - ) -> SyncBlocker { - let mut context = EvalContext::Sync(context); - loop { - let result = self.prompt.step(&pd.types, &pd.heap, &pd.modules, &mut context); - match result { - Err(err) => { - println!("Evaluation error:\n{}", err); - panic!("proper error handling when component fails"); - }, - Ok(cont) => match cont { - EvalContinuation::Stepping => continue, - EvalContinuation::BranchInconsistent => return SyncBlocker::Inconsistent, - // First need to exit synchronous block before definition may end - EvalContinuation::ComponentTerminated => unreachable!(), - // No nested synchronous blocks - EvalContinuation::SyncBlockStart => unreachable!(), - EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd, - // Not possible to create component in sync block - EvalContinuation::NewComponent(_, _, _) => unreachable!(), - EvalContinuation::NewChannel => unreachable!(), - EvalContinuation::NewFork => unreachable!(), - EvalContinuation::BlockFires(port) => { - return SyncBlocker::CouldntCheckFiring(port); - }, - EvalContinuation::BlockGet(port) => { - return SyncBlocker::CouldntReadMsg(port); - }, - EvalContinuation::Put(port, message) => { - let payload; - - // Extract bytes from `put` - match &message.values[0] { - Value::Null => { - return SyncBlocker::Inconsistent; - }, - Value::Message(heap_pos) => { - // Create a copy of the payload - let values = &message.regions[*heap_pos as usize]; - let mut bytes = Vec::with_capacity(values.len()); - for value in values { - bytes.push(value.as_uint8()); - } - payload = Payload(Arc::new(bytes)); - } - _ => unreachable!(), - } - return SyncBlocker::PutMsg(port, payload); - } - }, - } - } - } -} - -impl RunContext for EvalContext<'_> { - fn performed_put(&mut self, port: PortId) -> bool { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(_) => unreachable!(), - EvalContext::Sync(ctx) => { - ctx.did_put_or_get(port) - } - } - } - - fn performed_get(&mut self, port: PortId) -> Option { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(_) => unreachable!(), - EvalContext::Sync(ctx) => { - let payload = ctx.read_msg(port); - if payload.is_none() { - return None; - } - - let payload = payload.unwrap(); - let mut transformed = Vec::with_capacity(payload.len()); - for byte in payload.0.iter() { - transformed.push(Value::UInt8(*byte)); - } - - let value_group = ValueGroup{ - values: vec![Value::Message(0)], - regions: vec![transformed], - }; - - return Some(value_group); - } - } - } - - fn fires(&mut self, port: PortId) -> Option { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(_) => unreachable!(), - EvalContext::Sync(context) => { - match context.is_firing(port) { - Some(did_fire) => Some(Value::Bool(did_fire)), - None => None, - } - } - } - } - - fn created_channel(&mut self) -> Option<(Value, Value)> { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(context) => { - let [from, to] = context.new_port_pair(); - let from = Value::Output(from); - let to = Value::Input(to); - return Some((from, to)); - }, - EvalContext::Sync(_) => unreachable!(), - } - } - - fn performed_fork(&mut self) -> Option { - // Never actually used in the old runtime - return None; - } -} - -// TODO: @remove once old runtime has disappeared -impl EvalContext<'_> { - fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) -> () { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(context) => { - context.new_component(moved_ports, init_state) - } - EvalContext::Sync(_) => unreachable!(), - } - } - fn new_channel(&mut self) -> [Value; 2] { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(context) => { - let [from, to] = context.new_port_pair(); - let from = Value::Output(from); - let to = Value::Input(to); - return [from, to]; - } - EvalContext::Sync(_) => unreachable!(), - } - } - fn fires(&mut self, port: Value) -> Option { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(_) => unreachable!(), - EvalContext::Sync(context) => match port { - Value::Output(port) => context.is_firing(port).map(Value::Bool), - Value::Input(port) => context.is_firing(port).map(Value::Bool), - _ => unreachable!(), - }, - } - } - fn get(&mut self, port: Value, store: &mut Store) -> Option { - match self { - EvalContext::None => unreachable!(), - EvalContext::Nonsync(_) => unreachable!(), - EvalContext::Sync(context) => match port { - Value::Input(port) => { - let payload = context.read_msg(port); - if payload.is_none() { return None; } - - let heap_pos = store.alloc_heap(); - let heap_pos_usize = heap_pos as usize; - let payload = payload.unwrap(); - store.heap_regions[heap_pos_usize].values.reserve(payload.0.len()); - for value in payload.0.iter() { - store.heap_regions[heap_pos_usize].values.push(Value::UInt8(*value)); - } - - return Some(Value::Message(heap_pos)); - } - _ => unreachable!(), - }, - } - } - fn did_put(&mut self, port: Value) -> bool { - match self { - EvalContext::None => unreachable!("did_put in None context"), - EvalContext::Nonsync(_) => unreachable!("did_put in nonsync context"), - EvalContext::Sync(context) => match port { - Value::Output(port) => { - context.did_put_or_get(port) - }, - _ => unreachable!("did_put on non-output port value") - } - } - } -} diff --git a/src/runtime2/branch.rs b/src/runtime/branch.rs similarity index 100% rename from src/runtime2/branch.rs rename to src/runtime/branch.rs diff --git a/src/runtime2/connector.rs b/src/runtime/connector.rs similarity index 98% rename from src/runtime2/connector.rs rename to src/runtime/connector.rs index a81a34f5cf77982673af371d6ae6bc0cd04a8e82..0b95f378e22907f6cf3479bc9bd7a60176ac6d48 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime/connector.rs @@ -28,8 +28,8 @@ use std::sync::atomic::AtomicBool; -use crate::{PortId, ProtocolDescription}; -use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; +use crate::ProtocolDescription; +use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, PortId, ValueGroup}; use crate::protocol::RunContext; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; @@ -102,7 +102,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ fn fires(&mut self, port: PortId) -> Option { todo!("Remove fires() now"); - let port_id = PortIdLocal::new(port.0.u32_suffix); + let port_id = PortIdLocal::new(port.id); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.expected_firing.map(|v| Value::Bool(v)); } @@ -310,7 +310,7 @@ impl ConnectorPDL { }, EvalContinuation::BlockFires(port_id) => { // Branch called `fires()` on a port that has not been used yet. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let port_id = PortIdLocal::new(port_id.id); // Create two forks, one that assumes the port will fire, and // one that assumes the port remains silent @@ -335,7 +335,7 @@ impl ConnectorPDL { EvalContinuation::BlockGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let port_id = PortIdLocal::new(port_id.id); branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; @@ -392,7 +392,7 @@ impl ConnectorPDL { } EvalContinuation::Put(port_id, content) => { // Branch is attempting to send data - let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let port_id = PortIdLocal::new(port_id.id); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); let message = DataMessage{ sync_header, data_header, content }; match comp_ctx.submit_message(Message::Data(message)) { diff --git a/src/runtime2/consensus.rs b/src/runtime/consensus.rs similarity index 99% rename from src/runtime2/consensus.rs rename to src/runtime/consensus.rs index d7b55d2fd9f7b83af9aa27c2cd7d2059da54efef..ddb3e086df412d7b43532c04f2cd6d41d2e4b22b 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime/consensus.rs @@ -1549,7 +1549,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port - let cur_port = PortIdLocal::new(port_id.0.u32_suffix); + let cur_port = PortIdLocal::new(port_id.id); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added diff --git a/src/runtime2/inbox.rs b/src/runtime/inbox.rs similarity index 98% rename from src/runtime2/inbox.rs rename to src/runtime/inbox.rs index 083a4f6fbf0e76faa600e45c79325bd8b33e5d82..154bb40d1bce3f6d734dd3f22a3cc1396e8d9a4c 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime/inbox.rs @@ -2,8 +2,8 @@ use std::sync::Mutex; use std::collections::VecDeque; use crate::protocol::eval::ValueGroup; -use crate::runtime2::consensus::{ComponentPresence, SolutionCombiner}; -use crate::runtime2::port::ChannelId; +use crate::runtime::consensus::{ComponentPresence, SolutionCombiner}; +use crate::runtime::port::ChannelId; use super::ConnectorId; use super::consensus::{GlobalSolution, LocalSolution}; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 95bc3e85c6f4df6c9a221764f5eb8ad46745159e..c522a7397a83c029a0eabe85233498d0e844c7e0 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,903 +1,640 @@ -/// cbindgen:ignore -mod communication; -/// cbindgen:ignore -mod endpoints; -pub mod error; -/// cbindgen:ignore -mod logging; -/// cbindgen:ignore -mod setup; - -#[cfg(test)] -mod tests; - -use crate::common::*; -use error::*; -use mio::net::UdpSocket; - -/// The interface between the user's application and a communication session, -/// in which the application plays the part of a (native) component. This structure provides the application -/// with functionality available to all components: the ability to add new channels (port pairs), and to -/// instantiate new components whose definitions are defined in the connector's configured protocol -/// description. Native components have the additional ability to add `dangling' ports backed by local/remote -/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. -/// This allows sets of applications to cooperate in constructing shared sessions that span the network. -#[derive(Debug)] -pub struct Connector { - unphased: ConnectorUnphased, - phased: ConnectorPhased, -} +// Structure of module -/// Characterizes a type which can write lines of logging text. -/// The implementations provided in the `logging` module are likely to be sufficient, -/// but for added flexibility, users are able to implement their own loggers for use -/// by connectors. -pub trait Logger: Debug + Send + Sync { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; -} +mod branch; +mod native; +mod port; +mod scheduler; +mod consensus; +mod inbox; -/// A logger that appends the logged strings to a growing byte buffer -#[derive(Debug)] -pub struct VecLogger(ConnectorId, Vec); +#[cfg(test)] mod tests; +mod connector; -/// A trivial logger that always returns None, such that no logging information is ever written. -#[derive(Debug)] -pub struct DummyLogger; +// Imports -/// A logger that writes the logged lines to a given file. -#[derive(Debug)] -pub struct FileLogger(ConnectorId, std::fs::File); - -// Interface between protocol state and the connector runtime BEFORE all components -// ave begun their branching speculation. See ComponentState::nonsync_run. -pub(crate) struct NonsyncProtoContext<'a> { - ips: &'a mut IdAndPortState, - logger: &'a mut dyn Logger, - unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase - proto_component_id: ComponentId, // KEY in id->component map -} +use std::collections::VecDeque; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::thread::{self, JoinHandle}; -// Interface between protocol state and the connector runtime AFTER all components -// have begun their branching speculation. See ComponentState::sync_run. -pub(crate) struct SyncProtoContext<'a> { - rctx: &'a RoundCtx, - branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch - predicate: &'a Predicate, // KEY in pred->branch map -} +use crate::collections::RawVec; +use crate::ProtocolDescription; -// The data coupled with a particular protocol component branch, but crucially omitting -// the `ComponentState` such that this may be passed by reference to the state with separate -// access control. -#[derive(Default, Debug, Clone)] -struct ProtoComponentBranchInner { - did_put_or_get: HashSet, - inbox: HashMap, -} +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; +use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; +use native::{Connector, ConnectorApplication, ApplicationInterface}; +use inbox::Message; +use port::{ChannelId, Port, PortState}; -// A speculative variable that lives for the duration of the synchronous round. -// Each is assigned a value in domain `SpecVal`. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVar(PortId); - -// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, -// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVal(u16); - -// Data associated with a successful synchronous round, retained afterwards such that the -// native component can freely reflect on how it went, reading the messages received at their -// inputs, and reflecting on which of their connector's synchronous batches succeeded. +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. #[derive(Debug)] -struct RoundEndedNative { - batch_index: usize, - gotten: HashMap, +pub(crate) struct ConnectorKey { + pub index: u32, // of connector + pub generation: u32, } -// Implementation of a set in terms of a vector (optimized for reading, not writing) -#[derive(Default)] -struct VecSet { - // invariant: ordered, deduplicated - vec: Vec, -} +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId{ + index: self.index, + generation: self.generation, + }; + } -// Allows a connector to remember how to forward payloads towards the component that -// owns their destination port. `LocalComponent` corresponds with messages for components -// managed by the connector itself (hinting for it to look it up in a local structure), -// whereas the other variants direct the connector to forward the messages over the network. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum Route { - LocalComponent, - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ + index: id.index, + generation: id.generation, + }; + } } -// The outcome of a synchronous round, representing the distributed consensus. -// In the success case, the attached predicate encodes a row in the session's trace table. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Decision { - Failure, // some connector timed out! - Success(Predicate), +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Debug, Copy, Clone)] +pub struct ConnectorId{ + pub index: u32, + pub generation: u32, } -// The type of control messages exchanged between connectors over the network -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum Msg { - SetupMsg(SetupMsg), - CommMsg(CommMsg), +impl PartialEq for ConnectorId { + fn eq(&self, other: &Self) -> bool { + return self.index.eq(&other.index); + } } -// Control messages exchanged during the setup phase only -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum SetupMsg { - MyPortInfo(MyPortInfo), - LeaderWave { wave_leader: ConnectorId }, - LeaderAnnounce { tree_leader: ConnectorId }, - YouAreMyParent, -} +impl Eq for ConnectorId{} -// Control message particular to the communication phase. -// as such, it's annotated with a round_index -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct CommMsg { - round_index: usize, - contents: CommMsgContents, +impl PartialOrd for ConnectorId{ + fn partial_cmp(&self, other: &Self) -> Option { + return self.index.partial_cmp(&other.index) + } } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommMsgContents { - SendPayload(SendPayloadMsg), - CommCtrl(CommCtrlMsg), +impl Ord for ConnectorId{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + return self.partial_cmp(other).unwrap(); + } } -// Connector <-> connector control messages for use in the communication phase -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommCtrlMsg { - Suggest { suggestion: Decision }, // child->parent - Announce { decision: Decision }, // parent->child -} +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId { + index: u32::MAX, + generation: 0, + }; + } -// Speculative payload message, communicating the value for the given -// port's message predecated on the given speculative variable assignments. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct SendPayloadMsg { - predicate: Predicate, - payload: Payload, + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.index != u32::MAX; + } } -// Return result of `Predicate::assignment_union`, communicating the contents -// of the predicate which represents the (consistent) union of their mappings, -// if it exists (no variable mapped distinctly by the input predicates) -#[derive(Debug, PartialEq)] -enum AssignmentUnionResult { - FormerNotLatter, - LatterNotFormer, - Equivalent, - New(Predicate), - Nonexistant, +// TODO: Change this, I hate this. But I also don't want to put `public` and +// `router` of `ScheduledConnector` back into `Connector`. The reason I don't +// want `Box` everywhere is because of the v-table overhead. But +// to truly design this properly I need some benchmarks. +pub(crate) enum ConnectorVariant { + UserDefined(ConnectorPDL), + Native(Box), } -// One of two endpoints for a control channel with a connector on either end. -// The underlying transport is TCP, so we use an inbox buffer to allow -// discrete payload receipt. -struct NetEndpoint { - inbox: Vec, - stream: TcpStream, +impl Connector for ConnectorVariant { + fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + match self { + ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), + ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), + } + } } -// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct NetEndpointSetup { - getter_for_incoming: PortId, - sock_addr: SocketAddr, - endpoint_polarity: EndpointPolarity, -} +pub(crate) struct ScheduledConnector { + pub connector: ConnectorVariant, // access by connector + pub ctx: ComponentCtx, + pub public: ConnectorPublic, // accessible by all schedulers and connectors + pub router: ControlMessageHandler, + pub shutting_down: bool, +} + +// ----------------------------------------------------------------------------- +// Runtime +// ----------------------------------------------------------------------------- + +/// Externally facing runtime. +pub struct Runtime { + inner: Arc, +} + +impl Runtime { + pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { + // Setup global state + assert!(num_threads > 0, "need a thread to run connectors"); + let runtime_inner = Arc::new(RuntimeInner{ + protocol_description, + port_counter: AtomicU32::new(0), + connectors: RwLock::new(ConnectorStore::with_capacity(32)), + connector_queue: Mutex::new(VecDeque::with_capacity(32)), + schedulers: Mutex::new(Vec::new()), + scheduler_notifier: Condvar::new(), + active_connectors: AtomicU32::new(0), + active_interfaces: AtomicU32::new(1), // this `Runtime` instance + should_exit: AtomicBool::new(false), + }); + + // Launch threads + { + let mut schedulers = Vec::with_capacity(num_threads as usize); + for thread_index in 0..num_threads { + let cloned_runtime_inner = runtime_inner.clone(); + let thread = thread::Builder::new() + .name(format!("thread-{}", thread_index)) + .spawn(move || { + let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); + scheduler.run(); + }) + .unwrap(); + + schedulers.push(thread); + } -// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct UdpEndpointSetup { - getter_for_incoming: PortId, - local_addr: SocketAddr, - peer_addr: SocketAddr, -} + let mut lock = runtime_inner.schedulers.lock().unwrap(); + *lock = schedulers; + } -// NetEndpoint annotated with the ID of the port that receives payload -// messages received through the endpoint. This approach assumes that NetEndpoints -// DO NOT multiplex port->port channels, and so a mapping such as this is possible. -// As a result, the messages themselves don't need to carry the PortID with them. -#[derive(Debug)] -struct NetEndpointExt { - net_endpoint: NetEndpoint, - getter_for_incoming: PortId, -} + // Return runtime + return Runtime{ inner: runtime_inner }; + } -// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" -// described in the literature. -// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), -// It acts as a native component by managing a (speculative) set of payload messages (an outbox, -// protecting the peer on the other side of the network). -#[derive(Debug)] -struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_this_round: bool, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} + /// Returns a new interface through which channels and connectors can be + /// created. + pub fn create_interface(&self) -> ApplicationInterface { + self.inner.increment_active_interfaces(); + let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); + let connector_key = self.inner.create_interface_component(connector); + interface.set_connector_id(connector_key.downcast()); -// Meta-data for the connector: its role in the consensus tree. -#[derive(Debug)] -struct Neighborhood { - parent: Option, - children: VecSet, + // Note that we're not scheduling. That is done by the interface in case + // it is actually needed. + return interface; + } } -// Manages the connector's ID, and manages allocations for connector/port IDs. -#[derive(Debug, Clone)] -struct IdManager { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, - component_suffix_stream: U32Stream, +impl Drop for Runtime { + fn drop(&mut self) { + self.inner.decrement_active_interfaces(); + let mut lock = self.inner.schedulers.lock().unwrap(); + for handle in lock.drain(..) { + handle.join().unwrap(); + } + } } -// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. -struct IoByteBuffer { - byte_vec: Vec, -} +// ----------------------------------------------------------------------------- +// RuntimeInner +// ----------------------------------------------------------------------------- + +pub(crate) struct RuntimeInner { + // Protocol + pub(crate) protocol_description: ProtocolDescription, + // Regular counter for port IDs + port_counter: AtomicU32, + // Storage of connectors and the work queue + connectors: RwLock, + connector_queue: Mutex>, + schedulers: Mutex>>, + // Conditions to determine whether the runtime can exit + scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. + // TODO: Figure out if we can simply merge the counters? + active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels + should_exit: AtomicBool, +} + +impl RuntimeInner { + // --- Managing the components queued for execution + + /// Wait until there is a connector to run. If there is one, then `Some` + /// will be returned. If there is no more work, then `None` will be + /// returned. + pub(crate) fn wait_for_work(&self) -> Option { + let mut lock = self.connector_queue.lock().unwrap(); + while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { + lock = self.scheduler_notifier.wait(lock).unwrap(); + } -// A generator of speculative variables. Created on-demand during the synchronous round -// by the IdManager. -#[derive(Debug)] -struct SpecVarStream { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, -} + return lock.pop_front(); + } + + pub(crate) fn push_work(&self, key: ConnectorKey) { + let mut lock = self.connector_queue.lock().unwrap(); + lock.push_back(key); + self.scheduler_notifier.notify_one(); + } + + // --- Creating/using ports + + /// Creates a new port pair. Note that these are stored globally like the + /// connectors are. Ports stored by components belong to those components. + pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { + use port::{PortIdLocal, PortKind}; + + let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); + let channel_id = ChannelId::new(getter_id); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + let getter_port = Port{ + self_id: getter_id, + peer_id: putter_id, + channel_id, + kind: PortKind::Getter, + state: PortState::Open, + peer_connector: creating_connector, + }; + let putter_port = Port{ + self_id: putter_id, + peer_id: getter_id, + channel_id, + kind: PortKind::Putter, + state: PortState::Open, + peer_connector: creating_connector, + }; + + return (getter_port, putter_port); + } + + /// Sends a message directly (without going through the port) to a + /// component. This is slightly less efficient then sending over a port, but + /// might be preferable for some algorithms. If the component was sleeping + /// then it is scheduled for execution. + pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { + let target = { + let mut lock = self.connectors.read().unwrap(); + lock.get(target_id.index) + }; + + // Do a CAS on the number of users. Most common case the component is + // alive and we're the only one sending the message. Note that if we + // finish this block, we're sure that no-one has set the `num_users` + // value to 0. This is essential! When at 0, the component is added to + // the freelist and the generation counter will be incremented. + let mut cur_num_users = 1; + while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { + if old_num_users == 0 { + // Cannot send message. Whatever the component state is + // (destroyed, at a different generation number, busy being + // destroyed, etc.) we cannot send the message and will not + // modify the component + return false; + } -// Manages the messy state of the various endpoints, pollers, buffers, etc. -#[derive(Debug)] -struct EndpointManager { - // invariants: - // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into - // 2. Events is empty - poll: Poll, - events: Events, - delayed_messages: Vec<(usize, Msg)>, - undelayed_messages: Vec<(usize, Msg)>, // ready to yield - net_endpoint_store: EndpointStore, - udp_endpoint_store: EndpointStore, - io_byte_buffer: IoByteBuffer, -} + cur_num_users = old_num_users; + } -// A storage of endpoints, which keeps track of which components have raised -// an event during poll(), signifying that they need to be checked for new incoming data -#[derive(Debug)] -struct EndpointStore { - endpoint_exts: Vec, - polled_undrained: VecSet, -} + // We incremented the counter. But we might still be at the wrong + // generation number. The generation number is a monotonically + // increasing value. Since it only increases when someone gets the + // `num_users` counter to 0, we can simply load the generation number. + let generation = target.generation.load(Ordering::Acquire); + if generation != target_id.generation { + // We're at the wrong generation, so we cannot send the message. + // However, since we incremented the `num_users` counter, the moment + // we decrement it we might be the one that are supposed to handle + // the destruction of the component. Note that all users of the + // component do an increment-followed-by-decrement, we can simply + // do a `fetch_sub`. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're the one that got the counter to 0, so we're the ones + // that are supposed to handle component exit + self.finish_component_destruction(target_id); + } -// The information associated with a port identifier, designed for local storage. -#[derive(Clone, Debug)] -struct PortInfo { - owner: ComponentId, - peer: Option, - polarity: Polarity, - route: Route, -} + return false; + } -// Similar to `PortInfo`, but designed for communication during the setup procedure. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct MyPortInfo { - polarity: Polarity, - port: PortId, - owner: ComponentId, -} + // The generation is correct, and since we incremented the `num_users` + // counter we're now sure that we can send the message and it will be + // handled by the receiver + target.connector.public.inbox.insert_message(message); + + // Finally, do the same as above: decrement number of users, if at gets + // to 0 we're the ones who should handle the exit condition. + let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We're allowed to destroy the component. + self.finish_component_destruction(target_id); + } else { + // Message is sent. If the component is sleeping, then we're sure + // it is not scheduled and it has not initiated the destruction of + // the component (because of the way + // `initiate_component_destruction` does not set sleeping to true). + // So we can safely schedule it. + let should_wake_up = target.connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } -// Newtype around port info map, allowing the implementation of some -// useful methods -#[derive(Default, Debug, Clone)] -struct PortInfoMap { - // invariant: self.invariant_preserved() - // `owned` is redundant information, allowing for fast lookup - // of a component's owned ports (which occurs during the sync round a lot) - map: HashMap, - owned: HashMap>, -} + return true + } -// A convenient substructure for containing port info and the ID manager. -// Houses the bulk of the connector's persistent state between rounds. -// It turns out several situations require access to both things. -#[derive(Debug, Clone)] -struct IdAndPortState { - port_info: PortInfoMap, - id_manager: IdManager, -} + /// Sends a message to a particular component, assumed to occur over a port. + /// If the component happened to be sleeping then it will be scheduled for + /// execution. Because of the port management system we may assumed that + /// we're always accessing the component at the right generation number. + pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { + let target = { + let lock = self.connectors.read().unwrap(); + let entry = lock.get(target_id.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); + &mut entry.connector.public + }; -// A component's setup-phase-specific data -#[derive(Debug)] -struct ConnectorCommunication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - native_batches: Vec, - round_result: Result, SyncError>, -} + target.inbox.insert_message(message); -// A component's data common to both setup and communication phases -#[derive(Debug)] -struct ConnectorUnphased { - proto_description: Arc, - proto_components: HashMap, - logger: Box, - ips: IdAndPortState, - native_component_id: ComponentId, -} + let should_wake_up = target.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); -// A connector's phase-specific data -#[derive(Debug)] -enum ConnectorPhased { - Setup(Box), - Communication(Box), -} + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } -// A connector's setup-phase-specific data -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} + // --- Creating/retrieving/destroying components -// A newtype wrapper for a map from speculative variable to speculative value -// A missing mapping corresponds with "unspecified". -#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -struct Predicate { - assigned: BTreeMap, -} + /// Creates an initially sleeping application connector. + fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { + // Initialize as sleeping, as it will be scheduled by the programmer. + let mut lock = self.connectors.write().unwrap(); + let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); -// Identifies a child of this connector in the _solution tree_. -// Each connector creates its own local solutions for the consensus procedure during `sync`, -// from the solutions of its children. Those children are either locally-managed components, -// (which are leaves in the solution tree), or other connectors reachable through the given -// network endpoint (which are internal nodes in the solution tree). -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} + self.increment_active_components(); + return key; + } -// An accumulation of the connector's knowledge of all (a) the local solutions its children -// in the solution tree have found, and (b) its own solutions derivable from those of its children. -// This structure starts off each round with an empty set, and accumulates solutions as they are found -// by local components, or received over the network in control messages. -// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to -// say that these sets GROW until the round is over, and all solutions are reset. -#[derive(Debug)] -struct SolutionStorage { - // invariant: old_local U new_local solutions are those that can be created from - // the UNION of one element from each set in `subtree_solution`. - // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. - old_local: HashSet, // already sent to this connector's parent OR decided - new_local: HashSet, // not yet sent to this connector's parent OR decided - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} + /// Creates a new PDL component. This function just creates the component. + /// If you create it initially awake, then you must add it to the work + /// queue. Other aspects of correctness (i.e. setting initial ports) are + /// relinquished to the caller! + pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { + // Create as not sleeping, as we'll schedule it immediately + let key = { + let mut lock = self.connectors.write().unwrap(); + lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) + }; -// Stores the transient data of a synchronous round. -// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of -// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, -// and can be undone if the round fails. -struct RoundCtx { - solution_storage: SolutionStorage, - spec_var_stream: SpecVarStream, - payload_inbox: Vec<(PortId, SendPayloadMsg)>, - deadline: Option, - ips: IdAndPortState, -} + self.increment_active_components(); + return key; + } -// A trait intended to limit the access of the ConnectorUnphased structure -// such that we don't accidentally modify any important component/port data -// while the results of the round are undecided. Why? Any actions during Connector::sync -// are _speculative_ until the round is decided, and we need a safe way of rolling -// back any changes. -trait CuUndecided { - fn logger(&mut self) -> &mut dyn Logger; - fn proto_description(&self) -> &ProtocolDescription; - fn native_component_id(&self) -> ComponentId; - fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); - fn logger_and_protocol_components( - &mut self, - ) -> (&mut dyn Logger, &mut HashMap); -} + /// Retrieve private access to the component through its key. + #[inline] + pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { + let entry = { + let lock = self.connectors.read().unwrap(); + lock.get(connector_key.index) + }; + + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); + return &mut entry.connector; + } + + // --- Managing component destruction + + /// Start component destruction, may only be done by the scheduler that is + /// executing the component. This might not actually destroy the component, + /// since other components might be sending it messages. + fn initiate_component_destruction(&self, connector_key: ConnectorKey) { + // Most of the time no-one will be sending messages, so try + // immediate destruction + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_key.index); + debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); + debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component + let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); + if old_num_users == 1 { + // We just brought the number of users down to 0. Destroy the + // component + entry.connector.public.inbox.clear(); + entry.generation.fetch_add(1, Ordering::SeqCst); + lock.destroy(connector_key); + self.decrement_active_components(); + } + } -// Represents a set of synchronous port operations that the native component -// has described as an "option" for completing during the synchronous rounds. -// Operations contained here succeed together or not at all. -// A native with N=2+ batches are expressing an N-way nondeterministic choice -#[derive(Debug, Default)] -struct NativeBatch { - // invariant: putters' and getters' polarities respected - to_put: HashMap, - to_get: HashSet, -} + fn finish_component_destruction(&self, connector_id: ConnectorId) { + let mut lock = self.connectors.write().unwrap(); + let entry = lock.get(connector_id.index); + debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); + let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_generation, connector_id.generation); -// Parallels a mio::Token type, but more clearly communicates -// the way it identifies the evented structre it corresponds to. -// See runtime/setup for methods converting between TokenTarget and mio::Token -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -enum TokenTarget { - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} + // TODO: In the future we should not only clear out the inbox, but send + // messages back to the senders indicating the messages did not arrive. + entry.connector.public.inbox.clear(); -// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, -// such that it can know when to continue polling, and when to block. -enum CommRecvOk { - TimeoutWithoutNew, - NewPayloadMsgs, - NewControlMsg { net_index: usize, msg: CommCtrlMsg }, -} -//////////////// -fn err_would_block(err: &std::io::Error) -> bool { - err.kind() == std::io::ErrorKind::WouldBlock -} -impl VecSet { - fn new(mut vec: Vec) -> Self { - // establish the invariant - vec.sort(); - vec.dedup(); - Self { vec } + // Invariant of only one thread being able to handle the internals of + // component is preserved by the fact that only one thread can decrement + // `num_users` to 0. + lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); + self.decrement_active_components(); } - fn contains(&self, element: &T) -> bool { - self.vec.binary_search(element).is_ok() - } - // Insert the given element. Returns whether it was already present. - fn insert(&mut self, element: T) -> bool { - match self.vec.binary_search(&element) { - Ok(_) => false, - Err(index) => { - self.vec.insert(index, element); - true + + // --- Managing exit condition + + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that active interfaces is now 0 + let num_connectors = self.active_connectors.load(Ordering::Acquire); + if num_connectors == 0 { + self.signal_for_shutdown(); } } } - fn iter(&self) -> std::slice::Iter { - self.vec.iter() - } - fn pop(&mut self) -> Option { - self.vec.pop() - } -} -impl PortInfoMap { - fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { - self.owned.get(&owner).into_iter().flat_map(HashSet::iter) - } - fn spec_var_for(&self, port: PortId) -> SpecVar { - // Every port maps to a speculative variable - // Two distinct ports map to the same variable - // IFF they are two ends of the same logical channel. - let info = self.map.get(&port).unwrap(); - SpecVar(match info.polarity { - Getter => port, - Putter => info.peer.unwrap(), - }) + + #[inline] + fn increment_active_components(&self) { + let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); } - fn invariant_preserved(&self) -> bool { - // for every port P with some owner O, - // P is in O's owned set - for (port, info) in self.map.iter() { - match self.owned.get(&info.owner) { - Some(set) if set.contains(port) => {} - _ => { - println!("{:#?}\n WITH port {:?}", self, port); - return false; - } - } - } - // for every port P owned by every owner O, - // P's owner is O - for (&owner, set) in self.owned.iter() { - for port in set { - match self.map.get(port) { - Some(info) if info.owner == owner => {} - _ => { - println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port); - return false; - } - } + + fn decrement_active_components(&self) { + let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { // such that we have no more active connectors (for now!) + let num_interfaces = self.active_interfaces.load(Ordering::Acquire); + if num_interfaces == 0 { + self.signal_for_shutdown(); } } - true - } -} -impl SpecVarStream { - fn next(&mut self) -> SpecVar { - let phantom_port: PortId = - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } - .into(); - SpecVar(phantom_port) - } -} -impl IdManager { - fn new(connector_id: ConnectorId) -> Self { - Self { - connector_id, - port_suffix_stream: Default::default(), - component_suffix_stream: Default::default(), - } - } - fn new_spec_var_stream(&self) -> SpecVarStream { - // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. - // This gap is entirely unnecessary (i.e. 0 is fine) - // It's purpose is only to make SpecVars easier to spot in logs. - // E.g. spot the spec var: { v0_0, v1_2, v1_103 } - const SKIP_N: u32 = 100; - let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); - SpecVarStream { connector_id: self.connector_id, port_suffix_stream } - } - fn new_port_id(&mut self) -> PortId { - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() - } - fn new_component_id(&mut self) -> ComponentId { - Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } - .into() - } -} -impl Drop for Connector { - fn drop(&mut self) { - log!(self.unphased.logger(), "Connector dropping. Goodbye!"); - } -} -// Given a slice of ports, return the first, if any, port is present repeatedly -fn duplicate_port(slice: &[PortId]) -> Option { - let mut vec = Vec::with_capacity(slice.len()); - for port in slice.iter() { - match vec.binary_search(port) { - Err(index) => vec.insert(index, *port), - Ok(_) => return Some(*port), - } - } - None -} -impl Connector { - /// Generate a random connector identifier from the system's source of randomness. - pub fn random_id() -> ConnectorId { - type Bytes8 = [u8; std::mem::size_of::()]; - unsafe { - let mut bytes = std::mem::MaybeUninit::::uninit(); - // getrandom is the canonical crate for a small, secure rng - getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); - // safe! representations of all valid Byte8 values are valid ConnectorId values - std::mem::transmute::<_, _>(bytes.assume_init()) - } } - /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, - /// and it is ready to participate in synchronous rounds of communication. - pub fn is_connected(&self) -> bool { - // If designed for Rust usage, connectors would be exposed as an enum type from the start. - // consequently, this "phased" business would also include connector variants and this would - // get a lot closer to the connector impl. itself. - // Instead, the C-oriented implementation doesn't distinguish connector states as types, - // and distinguish them as enum variants instead - match self.phased { - ConnectorPhased::Setup(..) => false, - ConnectorPhased::Communication(..) => true, + #[inline] + fn signal_for_shutdown(&self) { + debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); + debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); + + let _lock = self.connector_queue.lock().unwrap(); + let should_signal = self.should_exit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_signal { + self.scheduler_notifier.notify_all(); } } +} - /// Enables the connector's current logger to be swapped out for another - pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.logger, &mut new_logger); - new_logger - } +unsafe impl Send for RuntimeInner {} +unsafe impl Sync for RuntimeInner {} - /// Access the connector's current logger - pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.logger - } +// ----------------------------------------------------------------------------- +// ConnectorStore +// ----------------------------------------------------------------------------- - /// Create a new synchronous channel, returning its ends as a pair of ports, - /// with polarity output, input respectively. Available during either setup/communication phase. - /// # Panics - /// This function panics if the connector's (large) port id space is exhausted. - pub fn new_port_pair(&mut self) -> [PortId; 2] { - let cu = &mut self.unphased; - // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.ips.id_manager.new_port_id(); - // allocate two fresh port identifiers - let [o, i] = [new_cid(), new_cid()]; - // store info for each: - // - they are each others' peers - // - they are owned by a local component with id `cid` - // - polarity putter, getter respectively - cu.ips.port_info.map.insert( - o, - PortInfo { - route: Route::LocalComponent, - peer: Some(i), - owner: cu.native_component_id, - polarity: Putter, - }, - ); - cu.ips.port_info.map.insert( - i, - PortInfo { - route: Route::LocalComponent, - peer: Some(o), - owner: cu.native_component_id, - polarity: Getter, - }, - ); - cu.ips - .port_info - .owned - .entry(cu.native_component_id) - .or_default() - .extend([o, i].iter().copied()); - - log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } +struct StoreEntry { + connector: ScheduledConnector, + generation: std::sync::atomic::AtomicU32, + num_users: std::sync::atomic::AtomicU32, +} - /// Instantiates a new component for the connector runtime to manage, and passing - /// the given set of ports from the interface of the native component, to that of the - /// newly created component (passing their ownership). - /// # Errors - /// Error is returned if the moved ports are not owned by the native component, - /// if the given component name is not defined in the connector's protocol, - /// the given sequence of ports contains a duplicate port, - /// or if the component is unfit for instantiation with the given port sequence. - /// # Panics - /// This function panics if the connector's (large) component id space is exhausted. - pub fn add_component( - &mut self, - module_name: &[u8], - identifier: &[u8], - ports: &[PortId], - ) -> Result<(), AddComponentError> { - // Check for error cases first before modifying `cu` - use AddComponentError as Ace; - let cu = &self.unphased; - if let Some(port) = duplicate_port(ports) { - return Err(Ace::DuplicatePort(port)); - } - let expected_polarities = cu.proto_description.component_polarities(module_name, identifier)?; - if expected_polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); - } - for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.native_component_id { - return Err(Ace::UnknownPort(port)); - } - if info.polarity != expected_polarity { - return Err(Ace::WrongPortPolarity { port, expected_polarity }); - } - } - // No errors! Time to modify `cu` - // create a new component and identifier - let Connector { phased, unphased: cu } = self; - let new_cid = cu.ips.id_manager.new_component_id(); - cu.proto_components.insert(new_cid, cu.proto_description.new_component(module_name, identifier, ports)); - // update the ownership of moved ports - for port in ports.iter() { - match cu.ips.port_info.map.get_mut(port) { - Some(port_info) => port_info.owner = new_cid, - None => unreachable!(), - } - } - if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { - set.retain(|x| !ports.contains(x)); - } - let moved_port_set: HashSet = ports.iter().copied().collect(); - if let ConnectorPhased::Communication(comm) = phased { - // Preserve invariant: batches only reason about native's ports. - // Remove batch puts/gets for moved ports. - for batch in comm.native_batches.iter_mut() { - batch.to_put.retain(|port, _| !moved_port_set.contains(port)); - batch.to_get.retain(|port| !moved_port_set.contains(port)); - } - } - cu.ips.port_info.owned.insert(new_cid, moved_port_set); - Ok(()) - } +struct ConnectorStore { + // Freelist storage of connectors. Storage should be pointer-stable as + // someone might be mutating the vector while we're executing one of the + // connectors. + entries: RawVec<*mut StoreEntry>, + free: Vec, } -impl Predicate { - #[inline] - pub fn singleton(k: SpecVar, v: SpecVal) -> Self { - Self::default().inserted(k, v) - } - #[inline] - pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { - self.assigned.insert(k, v); - self - } - // Return true whether `self` is a subset of `maybe_superset` - pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { - for (var, val) in self.assigned.iter() { - match maybe_superset.assigned.get(var) { - Some(val2) if val2 == val => {} - _ => return false, // var unmapped, or mapped differently - } +impl ConnectorStore { + fn with_capacity(capacity: usize) -> Self { + Self { + entries: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), } - // `maybe_superset` mirrored all my assignments! - true } - /// Given the two predicates {self, other}, return that whose - /// assignments are the union of those of both. - fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { - use AssignmentUnionResult as Aur; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // populate lists of assignments in self but not other and vice versa. - // do this by incrementally unfolding the iterators, keeping an eye - // on the ordering between the head elements [s, o]. - // whenever s break, // both iterators are empty - [None, Some(x)] => { - // self's iterator is empty. - // all remaning elements are in other but not self - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - // other's iterator is empty. - // all remaning elements are in self but not other - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - // No predicate exists which satisfies both! - return Aur::Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } - } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Aur::Equivalent, // ... equivalent to both. - [false, true] => Aur::FormerNotLatter, // ... equivalent to self. - [true, false] => Aur::LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); - } - Aur::New(new) - } + /// Directly retrieves an entry. There be dragons here. The `connector` + /// might have its destructor already executed. Accessing it might then lead + /// to memory corruption. + fn get(&self, index: u32) -> &'static mut StoreEntry { + unsafe { + let entry = self.entries.get_mut(index as usize); + return &mut **entry; } } - // Compute the union of the assignments of the two given predicates, if it exists. - // It doesn't exist if there is some value which the predicates assign to different values. - pub(crate) fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} + /// Creates a new connector. Caller should ensure ports are set up correctly + /// and the connector is queued for execution if needed. + fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { + let mut connector = ScheduledConnector { + connector, + ctx: ComponentCtx::new_empty(), + public: ConnectorPublic::new(initially_sleeping), + router: ControlMessageHandler::new(), + shutting_down: false, + }; + + let index; + let key; + + if self.free.is_empty() { + // No free entries, allocate new entry + index = self.entries.len(); + key = ConnectorKey{ + index: index as u32, generation: 0 + }; + connector.ctx.id = key.downcast(); + + let connector = Box::into_raw(Box::new(StoreEntry{ + connector, + generation: AtomicU32::new(0), + num_users: AtomicU32::new(1), + })); + self.entries.push(connector); + } else { + // Free spot available + index = self.free.pop().unwrap(); + + unsafe { + let target = &mut **self.entries.get_mut(index); + std::ptr::write(&mut target.connector as *mut _, connector); + let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); + debug_assert_eq!(_old_num_users, 0); + + let generation = target.generation.load(Ordering::Acquire); + key = ConnectorKey{ index: index as u32, generation }; + target.connector.ctx.id = key.downcast(); } } - Some(res) - } - pub(crate) fn query(&self, var: SpecVar) -> Option { - self.assigned.get(&var).copied() - } -} -impl RoundCtx { - // remove an arbitrary buffered message, along with the ID of the getter who receives it - fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.payload_inbox.pop() + println!("DEBUG [ global store ] Created component at {}", key.index); + return key; } - // buffer a message along with the ID of the getter who receives it - fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.payload_inbox.push((getter, msg)); - } - - // buffer a message along with the ID of the putter who sent it - fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { - log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_push(getter, msg); - } else { - log!(cu.logger(), "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!", putter); + /// Destroys a connector. Caller should make sure it is not scheduled for + /// execution. Otherwise one experiences "bad stuff" (tm). + fn destroy(&mut self, key: ConnectorKey) { + unsafe { + let target = self.entries.get_mut(key.index as usize); + (**target).generation.fetch_add(1, Ordering::SeqCst); + std::ptr::drop_in_place(*target); + // Note: but not deallocating! } - } -} -impl Debug for VecSet { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_set().entries(self.vec.iter()).finish() + println!("DEBUG [ global store ] Destroyed component at {}", key.index); + self.free.push(key.index as usize); } } -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); - impl Debug for Assignment<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{:?}={:?}", (self.0).0, (self.0).1) + +impl Drop for ConnectorStore { + fn drop(&mut self) { + // Everything in the freelist already had its destructor called, so only + // has to be deallocated + for free_idx in self.free.iter().copied() { + unsafe { + let memory = self.entries.get_mut(free_idx); + let layout = std::alloc::Layout::for_value(&**memory); + std::alloc::dealloc(*memory as *mut u8, layout); + + // mark as null for the remainder + *memory = std::ptr::null_mut(); } } - f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() - } -} -impl IdParts for SpecVar { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl Debug for SpecVar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "v{}_{}", a, b) - } -} -impl SpecVal { - const FIRING: Self = SpecVal(1); - const SILENT: Self = SpecVal(0); - fn is_firing(self) -> bool { - self == Self::FIRING - // all else treated as SILENT - } - fn iter_domain() -> impl Iterator { - (0..).map(SpecVal) - } -} -impl Debug for SpecVal { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} -impl Default for IoByteBuffer { - fn default() -> Self { - let mut byte_vec = Vec::with_capacity(Self::CAPACITY); - unsafe { - // safe! this vector is guaranteed to have sufficient capacity - byte_vec.set_len(Self::CAPACITY); - } - Self { byte_vec } - } -} -impl IoByteBuffer { - const CAPACITY: usize = u16::MAX as usize + 1000; - fn as_mut_slice(&mut self) -> &mut [u8] { - self.byte_vec.as_mut_slice() - } -} -impl Debug for IoByteBuffer { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "IoByteBuffer") + // With the deallocated stuff marked as null, clear the remainder that + // is not null + for idx in 0..self.entries.len() { + unsafe { + let memory = *self.entries.get_mut(idx); + if !memory.is_null() { + let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh + } + } + } } -} +} \ No newline at end of file diff --git a/src/runtime2/native.rs b/src/runtime/native.rs similarity index 99% rename from src/runtime2/native.rs rename to src/runtime/native.rs index de0d3d27cad4d730d8a1879cd229b9c63c88034d..b92c59536f1423adb045fa1f53099ed75095c7b7 100644 --- a/src/runtime2/native.rs +++ b/src/runtime/native.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, Condvar}; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; -use crate::runtime2::consensus::RoundConclusion; +use crate::runtime::consensus::RoundConclusion; use super::{ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; @@ -445,7 +445,7 @@ impl ApplicationInterface { self.owned_ports.remove(position); } - let prompt = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; + let prompt = self.runtime.protocol_description.new_component(module.as_bytes(), routine.as_bytes(), arguments)?; let connector = ConnectorPDL::new(prompt); // Put on job queue diff --git a/src/runtime2/port.rs b/src/runtime/port.rs similarity index 100% rename from src/runtime2/port.rs rename to src/runtime/port.rs diff --git a/src/runtime2/scheduler.rs b/src/runtime/scheduler.rs similarity index 99% rename from src/runtime2/scheduler.rs rename to src/runtime/scheduler.rs index 6f1e70631389c68c8e28119649d77b76aff2bceb..59ebb2549f631608ed771282d51e6cf1ed83952f 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime/scheduler.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use crate::protocol::eval::EvalError; -use crate::runtime2::port::ChannelId; +use crate::runtime::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; diff --git a/src/runtime2/tests/api_component.rs b/src/runtime/tests/api_component.rs similarity index 100% rename from src/runtime2/tests/api_component.rs rename to src/runtime/tests/api_component.rs diff --git a/src/runtime2/tests/data_transmission.rs b/src/runtime/tests/data_transmission.rs similarity index 100% rename from src/runtime2/tests/data_transmission.rs rename to src/runtime/tests/data_transmission.rs diff --git a/src/runtime2/tests/mod.rs b/src/runtime/tests/mod.rs similarity index 97% rename from src/runtime2/tests/mod.rs rename to src/runtime/tests/mod.rs index 1f0a8423beda972cb7afcddabf978a3c57435192..3fdcdf9cdedf9f36b072a9185f65eb55e01181cd 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime/tests/mod.rs @@ -8,7 +8,7 @@ use super::*; use crate::{PortId, ProtocolDescription}; use crate::common::Id; use crate::protocol::eval::*; -use crate::runtime2::native::{ApplicationSyncAction}; +use crate::runtime::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing // pub(crate) const NUM_THREADS: u32 = 8; // number of threads in runtime diff --git a/src/runtime2/tests/network_shapes.rs b/src/runtime/tests/network_shapes.rs similarity index 100% rename from src/runtime2/tests/network_shapes.rs rename to src/runtime/tests/network_shapes.rs diff --git a/src/runtime2/tests/speculation.rs b/src/runtime/tests/speculation.rs similarity index 100% rename from src/runtime2/tests/speculation.rs rename to src/runtime/tests/speculation.rs diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime/tests/sync_failure.rs similarity index 100% rename from src/runtime2/tests/sync_failure.rs rename to src/runtime/tests/sync_failure.rs diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs deleted file mode 100644 index de82384c6a9ed35bbe0359ade150be249034e3e0..0000000000000000000000000000000000000000 --- a/src/runtime2/mod.rs +++ /dev/null @@ -1,640 +0,0 @@ -// Structure of module - -mod branch; -mod native; -mod port; -mod scheduler; -mod consensus; -mod inbox; - -#[cfg(test)] mod tests; -mod connector; - -// Imports - -use std::collections::VecDeque; -use std::sync::{Arc, Condvar, Mutex, RwLock}; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::thread::{self, JoinHandle}; - -use crate::collections::RawVec; -use crate::ProtocolDescription; - -use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; -use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; -use native::{Connector, ConnectorApplication, ApplicationInterface}; -use inbox::Message; -use port::{ChannelId, Port, PortState}; - -/// A kind of token that, once obtained, allows mutable access to a connector. -/// We're trying to use move semantics as much as possible: the owner of this -/// key is the only one that may execute the connector's code. -#[derive(Debug)] -pub(crate) struct ConnectorKey { - pub index: u32, // of connector - pub generation: u32, -} - -impl ConnectorKey { - /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable - /// access, to a "regular ID" which can be used to obtain immutable access. - #[inline] - pub fn downcast(&self) -> ConnectorId { - return ConnectorId{ - index: self.index, - generation: self.generation, - }; - } - - /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it - /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system - #[inline] - pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { - return ConnectorKey{ - index: id.index, - generation: id.generation, - }; - } -} - -/// A kind of token that allows shared access to a connector. Multiple threads -/// may hold this -#[derive(Debug, Copy, Clone)] -pub struct ConnectorId{ - pub index: u32, - pub generation: u32, -} - -impl PartialEq for ConnectorId { - fn eq(&self, other: &Self) -> bool { - return self.index.eq(&other.index); - } -} - -impl Eq for ConnectorId{} - -impl PartialOrd for ConnectorId{ - fn partial_cmp(&self, other: &Self) -> Option { - return self.index.partial_cmp(&other.index) - } -} - -impl Ord for ConnectorId{ - fn cmp(&self, other: &Self) -> crate::common::Ordering { - return self.partial_cmp(other).unwrap(); - } -} - -impl ConnectorId { - // TODO: Like the other `new_invalid`, maybe remove - #[inline] - pub fn new_invalid() -> ConnectorId { - return ConnectorId { - index: u32::MAX, - generation: 0, - }; - } - - #[inline] - pub(crate) fn is_valid(&self) -> bool { - return self.index != u32::MAX; - } -} - -// TODO: Change this, I hate this. But I also don't want to put `public` and -// `router` of `ScheduledConnector` back into `Connector`. The reason I don't -// want `Box` everywhere is because of the v-table overhead. But -// to truly design this properly I need some benchmarks. -pub(crate) enum ConnectorVariant { - UserDefined(ConnectorPDL), - Native(Box), -} - -impl Connector for ConnectorVariant { - fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), - ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), - } - } -} - -pub(crate) struct ScheduledConnector { - pub connector: ConnectorVariant, // access by connector - pub ctx: ComponentCtx, - pub public: ConnectorPublic, // accessible by all schedulers and connectors - pub router: ControlMessageHandler, - pub shutting_down: bool, -} - -// ----------------------------------------------------------------------------- -// Runtime -// ----------------------------------------------------------------------------- - -/// Externally facing runtime. -pub struct Runtime { - inner: Arc, -} - -impl Runtime { - pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { - // Setup global state - assert!(num_threads > 0, "need a thread to run connectors"); - let runtime_inner = Arc::new(RuntimeInner{ - protocol_description, - port_counter: AtomicU32::new(0), - connectors: RwLock::new(ConnectorStore::with_capacity(32)), - connector_queue: Mutex::new(VecDeque::with_capacity(32)), - schedulers: Mutex::new(Vec::new()), - scheduler_notifier: Condvar::new(), - active_connectors: AtomicU32::new(0), - active_interfaces: AtomicU32::new(1), // this `Runtime` instance - should_exit: AtomicBool::new(false), - }); - - // Launch threads - { - let mut schedulers = Vec::with_capacity(num_threads as usize); - for thread_index in 0..num_threads { - let cloned_runtime_inner = runtime_inner.clone(); - let thread = thread::Builder::new() - .name(format!("thread-{}", thread_index)) - .spawn(move || { - let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); - scheduler.run(); - }) - .unwrap(); - - schedulers.push(thread); - } - - let mut lock = runtime_inner.schedulers.lock().unwrap(); - *lock = schedulers; - } - - // Return runtime - return Runtime{ inner: runtime_inner }; - } - - /// Returns a new interface through which channels and connectors can be - /// created. - pub fn create_interface(&self) -> ApplicationInterface { - self.inner.increment_active_interfaces(); - let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); - let connector_key = self.inner.create_interface_component(connector); - interface.set_connector_id(connector_key.downcast()); - - // Note that we're not scheduling. That is done by the interface in case - // it is actually needed. - return interface; - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - self.inner.decrement_active_interfaces(); - let mut lock = self.inner.schedulers.lock().unwrap(); - for handle in lock.drain(..) { - handle.join().unwrap(); - } - } -} - -// ----------------------------------------------------------------------------- -// RuntimeInner -// ----------------------------------------------------------------------------- - -pub(crate) struct RuntimeInner { - // Protocol - pub(crate) protocol_description: ProtocolDescription, - // Regular counter for port IDs - port_counter: AtomicU32, - // Storage of connectors and the work queue - connectors: RwLock, - connector_queue: Mutex>, - schedulers: Mutex>>, - // Conditions to determine whether the runtime can exit - scheduler_notifier: Condvar, // coupled to mutex on `connector_queue`. - // TODO: Figure out if we can simply merge the counters? - active_connectors: AtomicU32, // active connectors (if sleeping, then still considered active) - active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels - should_exit: AtomicBool, -} - -impl RuntimeInner { - // --- Managing the components queued for execution - - /// Wait until there is a connector to run. If there is one, then `Some` - /// will be returned. If there is no more work, then `None` will be - /// returned. - pub(crate) fn wait_for_work(&self) -> Option { - let mut lock = self.connector_queue.lock().unwrap(); - while lock.is_empty() && !self.should_exit.load(Ordering::Acquire) { - lock = self.scheduler_notifier.wait(lock).unwrap(); - } - - return lock.pop_front(); - } - - pub(crate) fn push_work(&self, key: ConnectorKey) { - let mut lock = self.connector_queue.lock().unwrap(); - lock.push_back(key); - self.scheduler_notifier.notify_one(); - } - - // --- Creating/using ports - - /// Creates a new port pair. Note that these are stored globally like the - /// connectors are. Ports stored by components belong to those components. - pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { - use port::{PortIdLocal, PortKind}; - - let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); - let channel_id = ChannelId::new(getter_id); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - let getter_port = Port{ - self_id: getter_id, - peer_id: putter_id, - channel_id, - kind: PortKind::Getter, - state: PortState::Open, - peer_connector: creating_connector, - }; - let putter_port = Port{ - self_id: putter_id, - peer_id: getter_id, - channel_id, - kind: PortKind::Putter, - state: PortState::Open, - peer_connector: creating_connector, - }; - - return (getter_port, putter_port); - } - - /// Sends a message directly (without going through the port) to a - /// component. This is slightly less efficient then sending over a port, but - /// might be preferable for some algorithms. If the component was sleeping - /// then it is scheduled for execution. - pub(crate) fn send_message_maybe_destroyed(&self, target_id: ConnectorId, message: Message) -> bool { - let target = { - let mut lock = self.connectors.read().unwrap(); - lock.get(target_id.index) - }; - - // Do a CAS on the number of users. Most common case the component is - // alive and we're the only one sending the message. Note that if we - // finish this block, we're sure that no-one has set the `num_users` - // value to 0. This is essential! When at 0, the component is added to - // the freelist and the generation counter will be incremented. - let mut cur_num_users = 1; - while let Err(old_num_users) = target.num_users.compare_exchange(cur_num_users, cur_num_users + 1, Ordering::SeqCst, Ordering::Acquire) { - if old_num_users == 0 { - // Cannot send message. Whatever the component state is - // (destroyed, at a different generation number, busy being - // destroyed, etc.) we cannot send the message and will not - // modify the component - return false; - } - - cur_num_users = old_num_users; - } - - // We incremented the counter. But we might still be at the wrong - // generation number. The generation number is a monotonically - // increasing value. Since it only increases when someone gets the - // `num_users` counter to 0, we can simply load the generation number. - let generation = target.generation.load(Ordering::Acquire); - if generation != target_id.generation { - // We're at the wrong generation, so we cannot send the message. - // However, since we incremented the `num_users` counter, the moment - // we decrement it we might be the one that are supposed to handle - // the destruction of the component. Note that all users of the - // component do an increment-followed-by-decrement, we can simply - // do a `fetch_sub`. - let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We're the one that got the counter to 0, so we're the ones - // that are supposed to handle component exit - self.finish_component_destruction(target_id); - } - - return false; - } - - // The generation is correct, and since we incremented the `num_users` - // counter we're now sure that we can send the message and it will be - // handled by the receiver - target.connector.public.inbox.insert_message(message); - - // Finally, do the same as above: decrement number of users, if at gets - // to 0 we're the ones who should handle the exit condition. - let old_num_users = target.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We're allowed to destroy the component. - self.finish_component_destruction(target_id); - } else { - // Message is sent. If the component is sleeping, then we're sure - // it is not scheduled and it has not initiated the destruction of - // the component (because of the way - // `initiate_component_destruction` does not set sleeping to true). - // So we can safely schedule it. - let should_wake_up = target.connector.public.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let key = unsafe{ ConnectorKey::from_id(target_id) }; - self.push_work(key); - } - } - - return true - } - - /// Sends a message to a particular component, assumed to occur over a port. - /// If the component happened to be sleeping then it will be scheduled for - /// execution. Because of the port management system we may assumed that - /// we're always accessing the component at the right generation number. - pub(crate) fn send_message_assumed_alive(&self, target_id: ConnectorId, message: Message) { - let target = { - let lock = self.connectors.read().unwrap(); - let entry = lock.get(target_id.index); - debug_assert_eq!(entry.generation.load(Ordering::Acquire), target_id.generation); - &mut entry.connector.public - }; - - target.inbox.insert_message(message); - - let should_wake_up = target.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let key = unsafe{ ConnectorKey::from_id(target_id) }; - self.push_work(key); - } - } - - // --- Creating/retrieving/destroying components - - /// Creates an initially sleeping application connector. - fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { - // Initialize as sleeping, as it will be scheduled by the programmer. - let mut lock = self.connectors.write().unwrap(); - let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); - - self.increment_active_components(); - return key; - } - - /// Creates a new PDL component. This function just creates the component. - /// If you create it initially awake, then you must add it to the work - /// queue. Other aspects of correctness (i.e. setting initial ports) are - /// relinquished to the caller! - pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { - // Create as not sleeping, as we'll schedule it immediately - let key = { - let mut lock = self.connectors.write().unwrap(); - lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) - }; - - self.increment_active_components(); - return key; - } - - /// Retrieve private access to the component through its key. - #[inline] - pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { - let entry = { - let lock = self.connectors.read().unwrap(); - lock.get(connector_key.index) - }; - - debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation, "private access to {:?}", connector_key); - return &mut entry.connector; - } - - // --- Managing component destruction - - /// Start component destruction, may only be done by the scheduler that is - /// executing the component. This might not actually destroy the component, - /// since other components might be sending it messages. - fn initiate_component_destruction(&self, connector_key: ConnectorKey) { - // Most of the time no-one will be sending messages, so try - // immediate destruction - let mut lock = self.connectors.write().unwrap(); - let entry = lock.get(connector_key.index); - debug_assert_eq!(entry.generation.load(Ordering::Acquire), connector_key.generation); - debug_assert_eq!(entry.connector.public.sleeping.load(Ordering::Acquire), false); // not sleeping: caller is executing this component - let old_num_users = entry.num_users.fetch_sub(1, Ordering::SeqCst); - if old_num_users == 1 { - // We just brought the number of users down to 0. Destroy the - // component - entry.connector.public.inbox.clear(); - entry.generation.fetch_add(1, Ordering::SeqCst); - lock.destroy(connector_key); - self.decrement_active_components(); - } - } - - fn finish_component_destruction(&self, connector_id: ConnectorId) { - let mut lock = self.connectors.write().unwrap(); - let entry = lock.get(connector_id.index); - debug_assert_eq!(entry.num_users.load(Ordering::Acquire), 0); - let _old_generation = entry.generation.fetch_add(1, Ordering::SeqCst); - debug_assert_eq!(_old_generation, connector_id.generation); - - // TODO: In the future we should not only clear out the inbox, but send - // messages back to the senders indicating the messages did not arrive. - entry.connector.public.inbox.clear(); - - // Invariant of only one thread being able to handle the internals of - // component is preserved by the fact that only one thread can decrement - // `num_users` to 0. - lock.destroy(unsafe{ ConnectorKey::from_id(connector_id) }); - self.decrement_active_components(); - } - - // --- Managing exit condition - - #[inline] - pub(crate) fn increment_active_interfaces(&self) { - let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); - debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero - } - - pub(crate) fn decrement_active_interfaces(&self) { - let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { // such that active interfaces is now 0 - let num_connectors = self.active_connectors.load(Ordering::Acquire); - if num_connectors == 0 { - self.signal_for_shutdown(); - } - } - } - - #[inline] - fn increment_active_components(&self) { - let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); - } - - fn decrement_active_components(&self) { - let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); - debug_assert!(old_num > 0); - if old_num == 1 { // such that we have no more active connectors (for now!) - let num_interfaces = self.active_interfaces.load(Ordering::Acquire); - if num_interfaces == 0 { - self.signal_for_shutdown(); - } - } - } - - #[inline] - fn signal_for_shutdown(&self) { - debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0); - debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0); - - let _lock = self.connector_queue.lock().unwrap(); - let should_signal = self.should_exit - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_signal { - self.scheduler_notifier.notify_all(); - } - } -} - -unsafe impl Send for RuntimeInner {} -unsafe impl Sync for RuntimeInner {} - -// ----------------------------------------------------------------------------- -// ConnectorStore -// ----------------------------------------------------------------------------- - -struct StoreEntry { - connector: ScheduledConnector, - generation: std::sync::atomic::AtomicU32, - num_users: std::sync::atomic::AtomicU32, -} - -struct ConnectorStore { - // Freelist storage of connectors. Storage should be pointer-stable as - // someone might be mutating the vector while we're executing one of the - // connectors. - entries: RawVec<*mut StoreEntry>, - free: Vec, -} - -impl ConnectorStore { - fn with_capacity(capacity: usize) -> Self { - Self { - entries: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), - } - } - - /// Directly retrieves an entry. There be dragons here. The `connector` - /// might have its destructor already executed. Accessing it might then lead - /// to memory corruption. - fn get(&self, index: u32) -> &'static mut StoreEntry { - unsafe { - let entry = self.entries.get_mut(index as usize); - return &mut **entry; - } - } - - /// Creates a new connector. Caller should ensure ports are set up correctly - /// and the connector is queued for execution if needed. - fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { - let mut connector = ScheduledConnector { - connector, - ctx: ComponentCtx::new_empty(), - public: ConnectorPublic::new(initially_sleeping), - router: ControlMessageHandler::new(), - shutting_down: false, - }; - - let index; - let key; - - if self.free.is_empty() { - // No free entries, allocate new entry - index = self.entries.len(); - key = ConnectorKey{ - index: index as u32, generation: 0 - }; - connector.ctx.id = key.downcast(); - - let connector = Box::into_raw(Box::new(StoreEntry{ - connector, - generation: AtomicU32::new(0), - num_users: AtomicU32::new(1), - })); - self.entries.push(connector); - } else { - // Free spot available - index = self.free.pop().unwrap(); - - unsafe { - let target = &mut **self.entries.get_mut(index); - std::ptr::write(&mut target.connector as *mut _, connector); - let _old_num_users = target.num_users.fetch_add(1, Ordering::SeqCst); - debug_assert_eq!(_old_num_users, 0); - - let generation = target.generation.load(Ordering::Acquire); - key = ConnectorKey{ index: index as u32, generation }; - target.connector.ctx.id = key.downcast(); - } - } - - println!("DEBUG [ global store ] Created component at {}", key.index); - return key; - } - - /// Destroys a connector. Caller should make sure it is not scheduled for - /// execution. Otherwise one experiences "bad stuff" (tm). - fn destroy(&mut self, key: ConnectorKey) { - unsafe { - let target = self.entries.get_mut(key.index as usize); - (**target).generation.fetch_add(1, Ordering::SeqCst); - std::ptr::drop_in_place(*target); - // Note: but not deallocating! - } - - println!("DEBUG [ global store ] Destroyed component at {}", key.index); - self.free.push(key.index as usize); - } -} - -impl Drop for ConnectorStore { - fn drop(&mut self) { - // Everything in the freelist already had its destructor called, so only - // has to be deallocated - for free_idx in self.free.iter().copied() { - unsafe { - let memory = self.entries.get_mut(free_idx); - let layout = std::alloc::Layout::for_value(&**memory); - std::alloc::dealloc(*memory as *mut u8, layout); - - // mark as null for the remainder - *memory = std::ptr::null_mut(); - } - } - - // With the deallocated stuff marked as null, clear the remainder that - // is not null - for idx in 0..self.entries.len() { - unsafe { - let memory = *self.entries.get_mut(idx); - if !memory.is_null() { - let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh - } - } - } - } -} \ No newline at end of file diff --git a/src/runtime/communication.rs b/src/runtime_old/communication.rs similarity index 100% rename from src/runtime/communication.rs rename to src/runtime_old/communication.rs diff --git a/src/runtime/endpoints.rs b/src/runtime_old/endpoints.rs similarity index 100% rename from src/runtime/endpoints.rs rename to src/runtime_old/endpoints.rs diff --git a/src/runtime/error.rs b/src/runtime_old/error.rs similarity index 100% rename from src/runtime/error.rs rename to src/runtime_old/error.rs diff --git a/src/runtime/logging.rs b/src/runtime_old/logging.rs similarity index 100% rename from src/runtime/logging.rs rename to src/runtime_old/logging.rs diff --git a/src/runtime_old/mod.rs b/src/runtime_old/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..95bc3e85c6f4df6c9a221764f5eb8ad46745159e --- /dev/null +++ b/src/runtime_old/mod.rs @@ -0,0 +1,903 @@ +/// cbindgen:ignore +mod communication; +/// cbindgen:ignore +mod endpoints; +pub mod error; +/// cbindgen:ignore +mod logging; +/// cbindgen:ignore +mod setup; + +#[cfg(test)] +mod tests; + +use crate::common::*; +use error::*; +use mio::net::UdpSocket; + +/// The interface between the user's application and a communication session, +/// in which the application plays the part of a (native) component. This structure provides the application +/// with functionality available to all components: the ability to add new channels (port pairs), and to +/// instantiate new components whose definitions are defined in the connector's configured protocol +/// description. Native components have the additional ability to add `dangling' ports backed by local/remote +/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. +/// This allows sets of applications to cooperate in constructing shared sessions that span the network. +#[derive(Debug)] +pub struct Connector { + unphased: ConnectorUnphased, + phased: ConnectorPhased, +} + +/// Characterizes a type which can write lines of logging text. +/// The implementations provided in the `logging` module are likely to be sufficient, +/// but for added flexibility, users are able to implement their own loggers for use +/// by connectors. +pub trait Logger: Debug + Send + Sync { + fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; +} + +/// A logger that appends the logged strings to a growing byte buffer +#[derive(Debug)] +pub struct VecLogger(ConnectorId, Vec); + +/// A trivial logger that always returns None, such that no logging information is ever written. +#[derive(Debug)] +pub struct DummyLogger; + +/// A logger that writes the logged lines to a given file. +#[derive(Debug)] +pub struct FileLogger(ConnectorId, std::fs::File); + +// Interface between protocol state and the connector runtime BEFORE all components +// ave begun their branching speculation. See ComponentState::nonsync_run. +pub(crate) struct NonsyncProtoContext<'a> { + ips: &'a mut IdAndPortState, + logger: &'a mut dyn Logger, + unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase + proto_component_id: ComponentId, // KEY in id->component map +} + +// Interface between protocol state and the connector runtime AFTER all components +// have begun their branching speculation. See ComponentState::sync_run. +pub(crate) struct SyncProtoContext<'a> { + rctx: &'a RoundCtx, + branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch + predicate: &'a Predicate, // KEY in pred->branch map +} + +// The data coupled with a particular protocol component branch, but crucially omitting +// the `ComponentState` such that this may be passed by reference to the state with separate +// access control. +#[derive(Default, Debug, Clone)] +struct ProtoComponentBranchInner { + did_put_or_get: HashSet, + inbox: HashMap, +} + +// A speculative variable that lives for the duration of the synchronous round. +// Each is assigned a value in domain `SpecVal`. +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +struct SpecVar(PortId); + +// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, +// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +struct SpecVal(u16); + +// Data associated with a successful synchronous round, retained afterwards such that the +// native component can freely reflect on how it went, reading the messages received at their +// inputs, and reflecting on which of their connector's synchronous batches succeeded. +#[derive(Debug)] +struct RoundEndedNative { + batch_index: usize, + gotten: HashMap, +} + +// Implementation of a set in terms of a vector (optimized for reading, not writing) +#[derive(Default)] +struct VecSet { + // invariant: ordered, deduplicated + vec: Vec, +} + +// Allows a connector to remember how to forward payloads towards the component that +// owns their destination port. `LocalComponent` corresponds with messages for components +// managed by the connector itself (hinting for it to look it up in a local structure), +// whereas the other variants direct the connector to forward the messages over the network. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +enum Route { + LocalComponent, + NetEndpoint { index: usize }, + UdpEndpoint { index: usize }, +} + +// The outcome of a synchronous round, representing the distributed consensus. +// In the success case, the attached predicate encodes a row in the session's trace table. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +enum Decision { + Failure, // some connector timed out! + Success(Predicate), +} + +// The type of control messages exchanged between connectors over the network +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum Msg { + SetupMsg(SetupMsg), + CommMsg(CommMsg), +} + +// Control messages exchanged during the setup phase only +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum SetupMsg { + MyPortInfo(MyPortInfo), + LeaderWave { wave_leader: ConnectorId }, + LeaderAnnounce { tree_leader: ConnectorId }, + YouAreMyParent, +} + +// Control message particular to the communication phase. +// as such, it's annotated with a round_index +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct CommMsg { + round_index: usize, + contents: CommMsgContents, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum CommMsgContents { + SendPayload(SendPayloadMsg), + CommCtrl(CommCtrlMsg), +} + +// Connector <-> connector control messages for use in the communication phase +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum CommCtrlMsg { + Suggest { suggestion: Decision }, // child->parent + Announce { decision: Decision }, // parent->child +} + +// Speculative payload message, communicating the value for the given +// port's message predecated on the given speculative variable assignments. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct SendPayloadMsg { + predicate: Predicate, + payload: Payload, +} + +// Return result of `Predicate::assignment_union`, communicating the contents +// of the predicate which represents the (consistent) union of their mappings, +// if it exists (no variable mapped distinctly by the input predicates) +#[derive(Debug, PartialEq)] +enum AssignmentUnionResult { + FormerNotLatter, + LatterNotFormer, + Equivalent, + New(Predicate), + Nonexistant, +} + +// One of two endpoints for a control channel with a connector on either end. +// The underlying transport is TCP, so we use an inbox buffer to allow +// discrete payload receipt. +struct NetEndpoint { + inbox: Vec, + stream: TcpStream, +} + +// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP +#[derive(Debug, Clone)] +struct NetEndpointSetup { + getter_for_incoming: PortId, + sock_addr: SocketAddr, + endpoint_polarity: EndpointPolarity, +} + +// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP +#[derive(Debug, Clone)] +struct UdpEndpointSetup { + getter_for_incoming: PortId, + local_addr: SocketAddr, + peer_addr: SocketAddr, +} + +// NetEndpoint annotated with the ID of the port that receives payload +// messages received through the endpoint. This approach assumes that NetEndpoints +// DO NOT multiplex port->port channels, and so a mapping such as this is possible. +// As a result, the messages themselves don't need to carry the PortID with them. +#[derive(Debug)] +struct NetEndpointExt { + net_endpoint: NetEndpoint, + getter_for_incoming: PortId, +} + +// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" +// described in the literature. +// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), +// It acts as a native component by managing a (speculative) set of payload messages (an outbox, +// protecting the peer on the other side of the network). +#[derive(Debug)] +struct UdpEndpointExt { + sock: UdpSocket, // already bound and connected + received_this_round: bool, + outgoing_payloads: HashMap, + getter_for_incoming: PortId, +} + +// Meta-data for the connector: its role in the consensus tree. +#[derive(Debug)] +struct Neighborhood { + parent: Option, + children: VecSet, +} + +// Manages the connector's ID, and manages allocations for connector/port IDs. +#[derive(Debug, Clone)] +struct IdManager { + connector_id: ConnectorId, + port_suffix_stream: U32Stream, + component_suffix_stream: U32Stream, +} + +// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. +struct IoByteBuffer { + byte_vec: Vec, +} + +// A generator of speculative variables. Created on-demand during the synchronous round +// by the IdManager. +#[derive(Debug)] +struct SpecVarStream { + connector_id: ConnectorId, + port_suffix_stream: U32Stream, +} + +// Manages the messy state of the various endpoints, pollers, buffers, etc. +#[derive(Debug)] +struct EndpointManager { + // invariants: + // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into + // 2. Events is empty + poll: Poll, + events: Events, + delayed_messages: Vec<(usize, Msg)>, + undelayed_messages: Vec<(usize, Msg)>, // ready to yield + net_endpoint_store: EndpointStore, + udp_endpoint_store: EndpointStore, + io_byte_buffer: IoByteBuffer, +} + +// A storage of endpoints, which keeps track of which components have raised +// an event during poll(), signifying that they need to be checked for new incoming data +#[derive(Debug)] +struct EndpointStore { + endpoint_exts: Vec, + polled_undrained: VecSet, +} + +// The information associated with a port identifier, designed for local storage. +#[derive(Clone, Debug)] +struct PortInfo { + owner: ComponentId, + peer: Option, + polarity: Polarity, + route: Route, +} + +// Similar to `PortInfo`, but designed for communication during the setup procedure. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct MyPortInfo { + polarity: Polarity, + port: PortId, + owner: ComponentId, +} + +// Newtype around port info map, allowing the implementation of some +// useful methods +#[derive(Default, Debug, Clone)] +struct PortInfoMap { + // invariant: self.invariant_preserved() + // `owned` is redundant information, allowing for fast lookup + // of a component's owned ports (which occurs during the sync round a lot) + map: HashMap, + owned: HashMap>, +} + +// A convenient substructure for containing port info and the ID manager. +// Houses the bulk of the connector's persistent state between rounds. +// It turns out several situations require access to both things. +#[derive(Debug, Clone)] +struct IdAndPortState { + port_info: PortInfoMap, + id_manager: IdManager, +} + +// A component's setup-phase-specific data +#[derive(Debug)] +struct ConnectorCommunication { + round_index: usize, + endpoint_manager: EndpointManager, + neighborhood: Neighborhood, + native_batches: Vec, + round_result: Result, SyncError>, +} + +// A component's data common to both setup and communication phases +#[derive(Debug)] +struct ConnectorUnphased { + proto_description: Arc, + proto_components: HashMap, + logger: Box, + ips: IdAndPortState, + native_component_id: ComponentId, +} + +// A connector's phase-specific data +#[derive(Debug)] +enum ConnectorPhased { + Setup(Box), + Communication(Box), +} + +// A connector's setup-phase-specific data +#[derive(Debug)] +struct ConnectorSetup { + net_endpoint_setups: Vec, + udp_endpoint_setups: Vec, +} + +// A newtype wrapper for a map from speculative variable to speculative value +// A missing mapping corresponds with "unspecified". +#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +struct Predicate { + assigned: BTreeMap, +} + +// Identifies a child of this connector in the _solution tree_. +// Each connector creates its own local solutions for the consensus procedure during `sync`, +// from the solutions of its children. Those children are either locally-managed components, +// (which are leaves in the solution tree), or other connectors reachable through the given +// network endpoint (which are internal nodes in the solution tree). +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +enum SubtreeId { + LocalComponent(ComponentId), + NetEndpoint { index: usize }, +} + +// An accumulation of the connector's knowledge of all (a) the local solutions its children +// in the solution tree have found, and (b) its own solutions derivable from those of its children. +// This structure starts off each round with an empty set, and accumulates solutions as they are found +// by local components, or received over the network in control messages. +// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to +// say that these sets GROW until the round is over, and all solutions are reset. +#[derive(Debug)] +struct SolutionStorage { + // invariant: old_local U new_local solutions are those that can be created from + // the UNION of one element from each set in `subtree_solution`. + // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. + old_local: HashSet, // already sent to this connector's parent OR decided + new_local: HashSet, // not yet sent to this connector's parent OR decided + // this pair acts as SubtreeId -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} + +// Stores the transient data of a synchronous round. +// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of +// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, +// and can be undone if the round fails. +struct RoundCtx { + solution_storage: SolutionStorage, + spec_var_stream: SpecVarStream, + payload_inbox: Vec<(PortId, SendPayloadMsg)>, + deadline: Option, + ips: IdAndPortState, +} + +// A trait intended to limit the access of the ConnectorUnphased structure +// such that we don't accidentally modify any important component/port data +// while the results of the round are undecided. Why? Any actions during Connector::sync +// are _speculative_ until the round is decided, and we need a safe way of rolling +// back any changes. +trait CuUndecided { + fn logger(&mut self) -> &mut dyn Logger; + fn proto_description(&self) -> &ProtocolDescription; + fn native_component_id(&self) -> ComponentId; + fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); + fn logger_and_protocol_components( + &mut self, + ) -> (&mut dyn Logger, &mut HashMap); +} + +// Represents a set of synchronous port operations that the native component +// has described as an "option" for completing during the synchronous rounds. +// Operations contained here succeed together or not at all. +// A native with N=2+ batches are expressing an N-way nondeterministic choice +#[derive(Debug, Default)] +struct NativeBatch { + // invariant: putters' and getters' polarities respected + to_put: HashMap, + to_get: HashSet, +} + +// Parallels a mio::Token type, but more clearly communicates +// the way it identifies the evented structre it corresponds to. +// See runtime/setup for methods converting between TokenTarget and mio::Token +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +enum TokenTarget { + NetEndpoint { index: usize }, + UdpEndpoint { index: usize }, +} + +// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, +// such that it can know when to continue polling, and when to block. +enum CommRecvOk { + TimeoutWithoutNew, + NewPayloadMsgs, + NewControlMsg { net_index: usize, msg: CommCtrlMsg }, +} +//////////////// +fn err_would_block(err: &std::io::Error) -> bool { + err.kind() == std::io::ErrorKind::WouldBlock +} +impl VecSet { + fn new(mut vec: Vec) -> Self { + // establish the invariant + vec.sort(); + vec.dedup(); + Self { vec } + } + fn contains(&self, element: &T) -> bool { + self.vec.binary_search(element).is_ok() + } + // Insert the given element. Returns whether it was already present. + fn insert(&mut self, element: T) -> bool { + match self.vec.binary_search(&element) { + Ok(_) => false, + Err(index) => { + self.vec.insert(index, element); + true + } + } + } + fn iter(&self) -> std::slice::Iter { + self.vec.iter() + } + fn pop(&mut self) -> Option { + self.vec.pop() + } +} +impl PortInfoMap { + fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { + self.owned.get(&owner).into_iter().flat_map(HashSet::iter) + } + fn spec_var_for(&self, port: PortId) -> SpecVar { + // Every port maps to a speculative variable + // Two distinct ports map to the same variable + // IFF they are two ends of the same logical channel. + let info = self.map.get(&port).unwrap(); + SpecVar(match info.polarity { + Getter => port, + Putter => info.peer.unwrap(), + }) + } + fn invariant_preserved(&self) -> bool { + // for every port P with some owner O, + // P is in O's owned set + for (port, info) in self.map.iter() { + match self.owned.get(&info.owner) { + Some(set) if set.contains(port) => {} + _ => { + println!("{:#?}\n WITH port {:?}", self, port); + return false; + } + } + } + // for every port P owned by every owner O, + // P's owner is O + for (&owner, set) in self.owned.iter() { + for port in set { + match self.map.get(port) { + Some(info) if info.owner == owner => {} + _ => { + println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port); + return false; + } + } + } + } + true + } +} +impl SpecVarStream { + fn next(&mut self) -> SpecVar { + let phantom_port: PortId = + Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } + .into(); + SpecVar(phantom_port) + } +} +impl IdManager { + fn new(connector_id: ConnectorId) -> Self { + Self { + connector_id, + port_suffix_stream: Default::default(), + component_suffix_stream: Default::default(), + } + } + fn new_spec_var_stream(&self) -> SpecVarStream { + // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. + // This gap is entirely unnecessary (i.e. 0 is fine) + // It's purpose is only to make SpecVars easier to spot in logs. + // E.g. spot the spec var: { v0_0, v1_2, v1_103 } + const SKIP_N: u32 = 100; + let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); + SpecVarStream { connector_id: self.connector_id, port_suffix_stream } + } + fn new_port_id(&mut self) -> PortId { + Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() + } + fn new_component_id(&mut self) -> ComponentId { + Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } + .into() + } +} +impl Drop for Connector { + fn drop(&mut self) { + log!(self.unphased.logger(), "Connector dropping. Goodbye!"); + } +} +// Given a slice of ports, return the first, if any, port is present repeatedly +fn duplicate_port(slice: &[PortId]) -> Option { + let mut vec = Vec::with_capacity(slice.len()); + for port in slice.iter() { + match vec.binary_search(port) { + Err(index) => vec.insert(index, *port), + Ok(_) => return Some(*port), + } + } + None +} +impl Connector { + /// Generate a random connector identifier from the system's source of randomness. + pub fn random_id() -> ConnectorId { + type Bytes8 = [u8; std::mem::size_of::()]; + unsafe { + let mut bytes = std::mem::MaybeUninit::::uninit(); + // getrandom is the canonical crate for a small, secure rng + getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); + // safe! representations of all valid Byte8 values are valid ConnectorId values + std::mem::transmute::<_, _>(bytes.assume_init()) + } + } + + /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, + /// and it is ready to participate in synchronous rounds of communication. + pub fn is_connected(&self) -> bool { + // If designed for Rust usage, connectors would be exposed as an enum type from the start. + // consequently, this "phased" business would also include connector variants and this would + // get a lot closer to the connector impl. itself. + // Instead, the C-oriented implementation doesn't distinguish connector states as types, + // and distinguish them as enum variants instead + match self.phased { + ConnectorPhased::Setup(..) => false, + ConnectorPhased::Communication(..) => true, + } + } + + /// Enables the connector's current logger to be swapped out for another + pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { + std::mem::swap(&mut self.unphased.logger, &mut new_logger); + new_logger + } + + /// Access the connector's current logger + pub fn get_logger(&mut self) -> &mut dyn Logger { + &mut *self.unphased.logger + } + + /// Create a new synchronous channel, returning its ends as a pair of ports, + /// with polarity output, input respectively. Available during either setup/communication phase. + /// # Panics + /// This function panics if the connector's (large) port id space is exhausted. + pub fn new_port_pair(&mut self) -> [PortId; 2] { + let cu = &mut self.unphased; + // adds two new associated ports, related to each other, and exposed to the native + let mut new_cid = || cu.ips.id_manager.new_port_id(); + // allocate two fresh port identifiers + let [o, i] = [new_cid(), new_cid()]; + // store info for each: + // - they are each others' peers + // - they are owned by a local component with id `cid` + // - polarity putter, getter respectively + cu.ips.port_info.map.insert( + o, + PortInfo { + route: Route::LocalComponent, + peer: Some(i), + owner: cu.native_component_id, + polarity: Putter, + }, + ); + cu.ips.port_info.map.insert( + i, + PortInfo { + route: Route::LocalComponent, + peer: Some(o), + owner: cu.native_component_id, + polarity: Getter, + }, + ); + cu.ips + .port_info + .owned + .entry(cu.native_component_id) + .or_default() + .extend([o, i].iter().copied()); + + log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + [o, i] + } + + /// Instantiates a new component for the connector runtime to manage, and passing + /// the given set of ports from the interface of the native component, to that of the + /// newly created component (passing their ownership). + /// # Errors + /// Error is returned if the moved ports are not owned by the native component, + /// if the given component name is not defined in the connector's protocol, + /// the given sequence of ports contains a duplicate port, + /// or if the component is unfit for instantiation with the given port sequence. + /// # Panics + /// This function panics if the connector's (large) component id space is exhausted. + pub fn add_component( + &mut self, + module_name: &[u8], + identifier: &[u8], + ports: &[PortId], + ) -> Result<(), AddComponentError> { + // Check for error cases first before modifying `cu` + use AddComponentError as Ace; + let cu = &self.unphased; + if let Some(port) = duplicate_port(ports) { + return Err(Ace::DuplicatePort(port)); + } + let expected_polarities = cu.proto_description.component_polarities(module_name, identifier)?; + if expected_polarities.len() != ports.len() { + return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); + } + for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { + let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; + if info.owner != cu.native_component_id { + return Err(Ace::UnknownPort(port)); + } + if info.polarity != expected_polarity { + return Err(Ace::WrongPortPolarity { port, expected_polarity }); + } + } + // No errors! Time to modify `cu` + // create a new component and identifier + let Connector { phased, unphased: cu } = self; + let new_cid = cu.ips.id_manager.new_component_id(); + cu.proto_components.insert(new_cid, cu.proto_description.new_component(module_name, identifier, ports)); + // update the ownership of moved ports + for port in ports.iter() { + match cu.ips.port_info.map.get_mut(port) { + Some(port_info) => port_info.owner = new_cid, + None => unreachable!(), + } + } + if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { + set.retain(|x| !ports.contains(x)); + } + let moved_port_set: HashSet = ports.iter().copied().collect(); + if let ConnectorPhased::Communication(comm) = phased { + // Preserve invariant: batches only reason about native's ports. + // Remove batch puts/gets for moved ports. + for batch in comm.native_batches.iter_mut() { + batch.to_put.retain(|port, _| !moved_port_set.contains(port)); + batch.to_get.retain(|port| !moved_port_set.contains(port)); + } + } + cu.ips.port_info.owned.insert(new_cid, moved_port_set); + Ok(()) + } +} +impl Predicate { + #[inline] + pub fn singleton(k: SpecVar, v: SpecVal) -> Self { + Self::default().inserted(k, v) + } + #[inline] + pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { + self.assigned.insert(k, v); + self + } + + // Return true whether `self` is a subset of `maybe_superset` + pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { + for (var, val) in self.assigned.iter() { + match maybe_superset.assigned.get(var) { + Some(val2) if val2 == val => {} + _ => return false, // var unmapped, or mapped differently + } + } + // `maybe_superset` mirrored all my assignments! + true + } + + /// Given the two predicates {self, other}, return that whose + /// assignments are the union of those of both. + fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { + use AssignmentUnionResult as Aur; + // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. + let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; + let [mut s, mut o] = [s_it.next(), o_it.next()]; + // populate lists of assignments in self but not other and vice versa. + // do this by incrementally unfolding the iterators, keeping an eye + // on the ordering between the head elements [s, o]. + // whenever s break, // both iterators are empty + [None, Some(x)] => { + // self's iterator is empty. + // all remaning elements are in other but not self + o_not_s.push(x); + o_not_s.extend(o_it); + break; + } + [Some(x), None] => { + // other's iterator is empty. + // all remaning elements are in self but not other + s_not_o.push(x); + s_not_o.extend(s_it); + break; + } + [Some((sid, sb)), Some((oid, ob))] => { + if sid < oid { + // o is missing this element + s_not_o.push((sid, sb)); + s = s_it.next(); + } else if sid > oid { + // s is missing this element + o_not_s.push((oid, ob)); + o = o_it.next(); + } else if sb != ob { + assert_eq!(sid, oid); + // both predicates assign the variable but differ on the value + // No predicate exists which satisfies both! + return Aur::Nonexistant; + } else { + // both predicates assign the variable to the same value + s = s_it.next(); + o = o_it.next(); + } + } + } + } + // Observed zero inconsistencies. A unified predicate exists... + match [s_not_o.is_empty(), o_not_s.is_empty()] { + [true, true] => Aur::Equivalent, // ... equivalent to both. + [false, true] => Aur::FormerNotLatter, // ... equivalent to self. + [true, false] => Aur::LatterNotFormer, // ... equivalent to other. + [false, false] => { + // ... which is the union of the predicates' assignments but + // is equivalent to neither self nor other. + let mut new = self.clone(); + for (&id, &b) in o_not_s { + new.assigned.insert(id, b); + } + Aur::New(new) + } + } + } + + // Compute the union of the assignments of the two given predicates, if it exists. + // It doesn't exist if there is some value which the predicates assign to different values. + pub(crate) fn union_with(&self, other: &Self) -> Option { + let mut res = self.clone(); + for (&channel_id, &assignment_1) in other.assigned.iter() { + match res.assigned.insert(channel_id, assignment_1) { + Some(assignment_2) if assignment_1 != assignment_2 => return None, + _ => {} + } + } + Some(res) + } + pub(crate) fn query(&self, var: SpecVar) -> Option { + self.assigned.get(&var).copied() + } +} + +impl RoundCtx { + // remove an arbitrary buffered message, along with the ID of the getter who receives it + fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { + self.payload_inbox.pop() + } + + // buffer a message along with the ID of the getter who receives it + fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.payload_inbox.push((getter, msg)); + } + + // buffer a message along with the ID of the putter who sent it + fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { + if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { + log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); + self.getter_push(getter, msg); + } else { + log!(cu.logger(), "Putter {:?} has no known peer!", putter); + panic!("Putter {:?} has no known peer!", putter); + } + } +} + +impl Debug for VecSet { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_set().entries(self.vec.iter()).finish() + } +} +impl Debug for Predicate { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); + impl Debug for Assignment<'_> { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}={:?}", (self.0).0, (self.0).1) + } + } + f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() + } +} +impl IdParts for SpecVar { + fn id_parts(self) -> (ConnectorId, U32Suffix) { + self.0.id_parts() + } +} +impl Debug for SpecVar { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let (a, b) = self.id_parts(); + write!(f, "v{}_{}", a, b) + } +} +impl SpecVal { + const FIRING: Self = SpecVal(1); + const SILENT: Self = SpecVal(0); + fn is_firing(self) -> bool { + self == Self::FIRING + // all else treated as SILENT + } + fn iter_domain() -> impl Iterator { + (0..).map(SpecVal) + } +} +impl Debug for SpecVal { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + self.0.fmt(f) + } +} +impl Default for IoByteBuffer { + fn default() -> Self { + let mut byte_vec = Vec::with_capacity(Self::CAPACITY); + unsafe { + // safe! this vector is guaranteed to have sufficient capacity + byte_vec.set_len(Self::CAPACITY); + } + Self { byte_vec } + } +} +impl IoByteBuffer { + const CAPACITY: usize = u16::MAX as usize + 1000; + fn as_mut_slice(&mut self) -> &mut [u8] { + self.byte_vec.as_mut_slice() + } +} + +impl Debug for IoByteBuffer { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "IoByteBuffer") + } +} diff --git a/src/runtime/setup.rs b/src/runtime_old/setup.rs similarity index 100% rename from src/runtime/setup.rs rename to src/runtime_old/setup.rs diff --git a/src/runtime/tests.rs b/src/runtime_old/tests.rs similarity index 100% rename from src/runtime/tests.rs rename to src/runtime_old/tests.rs