From 14c1a07d148aebfae66cc35cff29da38a6dc9019 2020-07-22 15:32:26 From: Christopher Esterhuyse Date: 2020-07-22 15:32:26 Subject: [PATCH] pseudo-socket ffi WIP. made better FFI socketaddr struct for use in platform independent C. new reowolf.h accordingly. --- diff --git a/examples/8_net_ports/amy.c b/examples/8_net_ports/amy.c index e84cb43bd52576dc19753ef31f427b76c0250e51..593842508670db75018d3006bdc7950136ffcf2e 100644 --- a/examples/8_net_ports/amy.c +++ b/examples/8_net_ports/amy.c @@ -10,13 +10,11 @@ int main(int argc, char** argv) { printf("Error str `%s`\n", reowolf_error_peek(NULL)); PortId putter, getter; - char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port( - c, &putter, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port( - c, &getter, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Passive); + connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); printf("Error str `%s`\n", reowolf_error_peek(NULL)); connector_connect(c, 4000); diff --git a/examples/9_net_self_putget/amy.c b/examples/9_net_self_putget/amy.c index ad7bc4af2d7244a8a062660d04399319eb193973..b4e784c246f6a20e06eab4bfd577f5f17a26cddf 100644 --- a/examples/9_net_self_putget/amy.c +++ b/examples/9_net_self_putget/amy.c @@ -10,12 +10,10 @@ int main(int argc, char** argv) { printf("Error str `%s`\n", reowolf_error_peek(NULL)); PortId putter, getter; - char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port( - c, &putter, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port( - c, &getter, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Passive); + connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); printf("Error str `%s`\n", reowolf_error_peek(NULL)); connector_connect(c, 4000); diff --git a/examples/a_swap/amy.c b/examples/a_swap/amy.c deleted file mode 100644 index a409c74f4075c67fe413b6baa78e7831ee181b5c..0000000000000000000000000000000000000000 --- a/examples/a_swap/amy.c +++ /dev/null @@ -1,52 +0,0 @@ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - if(argc != 3) { - printf("Expected arg[1] and arg[2] for use as addr str\n"); - exit(1); - } - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - char logpath[] = "./a_amy_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId ports[6]; - connector_add_port_pair(c, &ports[0], &ports[1]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Polarity_Getter, EndpointPolarity_Passive); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Polarity_Putter, EndpointPolarity_Active); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_port_pair(c, &ports[4], &ports[5]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - // native {0,1,2,3,4,5} - - connector_add_component(c, "together", 8, &ports[1], 4); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - // native {0,5} together {1,2,3,4} - - connector_connect(c, 4000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_put_bytes(c, ports[0], "hi", 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_get(c, ports[5]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_sync(c, 1000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - printf("Got msg `%.*s`\n", msg_len, msg_ptr); - - protocol_description_destroy(pd); - connector_destroy(c); - return 0; -} \ No newline at end of file diff --git a/examples/a_swap/bob.c b/examples/a_swap/bob.c deleted file mode 100644 index 35658d2b917f8ebf9b5471f5068c7d576b627266..0000000000000000000000000000000000000000 --- a/examples/a_swap/bob.c +++ /dev/null @@ -1,52 +0,0 @@ -#include -#include -#include "../../reowolf.h" -#include "../utility.c" - -int main(int argc, char** argv) { - if(argc != 3) { - printf("Expected arg[1] and arg[2] for use as addr str\n"); - exit(1); - } - char * pdl_ptr = buffer_pdl("eg_protocols.pdl"); - size_t pdl_len = strlen(pdl_ptr); - Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len); - char logpath[] = "./a_bob_log.txt"; - Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - PortId ports[6]; - connector_add_port_pair(c, &ports[0], &ports[1]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Polarity_Getter, EndpointPolarity_Passive); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Polarity_Putter, EndpointPolarity_Active); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_add_port_pair(c, &ports[4], &ports[5]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - // native {0,1,2,3,4,5} - - connector_add_component(c, "together", 8, &ports[1], 4); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - // native {0,5} together {1,2,3,4} - - connector_connect(c, 4000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_put_bytes(c, ports[0], "hi", 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - connector_get(c, ports[5]); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - connector_sync(c, 1000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - printf("Got msg `%.*s`\n", msg_len, msg_ptr); - - protocol_description_destroy(pd); - connector_destroy(c); - return 0; -} \ No newline at end of file diff --git a/examples/make.py b/examples/make.py index 40da5721455880d48e559995ce51e7f9875c29cb..b2623feab999a5556bb1605d39d473e3533c7cca 100644 --- a/examples/make.py +++ b/examples/make.py @@ -4,7 +4,7 @@ for c_file in glob.glob(script_path + "/*/*.c", recursive=False): print("compiling", c_file) args = [ "gcc", # compiler - "-std=c11" # C11 mode + "-std=c11", # C11 mode "-L", # lib path flag "./", # where to look for libs "-lreowolf_rs", # add lib called "reowolf_rs" diff --git a/examples/pres_1/amy.c b/examples/pres_1/amy.c index c49a14f216856ddbcaa991388b02c88cf466cb45..63ea181f18b816de47d8e53b6c884f1afed8eee8 100644 --- a/examples/pres_1/amy.c +++ b/examples/pres_1/amy.c @@ -16,9 +16,8 @@ int main(int argc, char** argv) { // ... with 1 outgoing network connection PortId p0; - char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1, - Polarity_Putter, EndpointPolarity_Passive); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &p0, addr, Polarity_Putter, EndpointPolarity_Passive); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_1/bob.c b/examples/pres_1/bob.c index dbaee3b8c850fa1d6ca9f96561091ba36a09280d..e1b2cb279fadb91b6cdeaef69acd5bbc1686b10e 100644 --- a/examples/pres_1/bob.c +++ b/examples/pres_1/bob.c @@ -12,9 +12,8 @@ int main(int argc, char** argv) { // ... with 1 outgoing network connection PortId p0; - char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1, - Polarity_Getter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &p0, addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_2/bob.c b/examples/pres_2/bob.c index 686c3fa016698fdb9622eae856799bf0890a84d3..32028b18c7e015b5c88aaf4515282e0971405f48 100644 --- a/examples/pres_2/bob.c +++ b/examples/pres_2/bob.c @@ -13,9 +13,8 @@ int main(int argc, char** argv) { // ... with 1 outgoing network connection PortId ports[3]; - char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr_str, sizeof(addr_str)-1, - Polarity_Getter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); connector_add_port_pair(c, &ports[1], &ports[2]); connector_add_component(c, "pres_2", 6, ports, 2); rw_err_peek(c); diff --git a/examples/pres_3/amy.c b/examples/pres_3/amy.c index 2585e1c9a36f14f75eba56ba722c5e1a4a82a84a..a8e7ecb3e80226cc37ad235af8db18649a0ef042 100644 --- a/examples/pres_3/amy.c +++ b/examples/pres_3/amy.c @@ -12,13 +12,11 @@ int main(int argc, char** argv) { // ... with 2 outgoing network connections PortId ports[2]; - char * addr = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr, strlen(addr), - Polarity_Putter, EndpointPolarity_Passive); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Putter, EndpointPolarity_Passive); rw_err_peek(c); - addr = "127.0.0.1:8001"; - connector_add_net_port(c, &ports[1], addr, strlen(addr), - Polarity_Putter, EndpointPolarity_Passive); + addr.port = 8001; + connector_add_net_port(c, &ports[1], addr, Polarity_Putter, EndpointPolarity_Passive); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_3/bob.c b/examples/pres_3/bob.c index 4203808777b30fa8269c27b5836815f28a4b9d3b..78306e23095285c7338530b1fd26379c9f21182c 100644 --- a/examples/pres_3/bob.c +++ b/examples/pres_3/bob.c @@ -12,13 +12,11 @@ int main(int argc, char** argv) { // ... with 2 outgoing network connections PortId ports[2]; - char * addr = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); - addr = "127.0.0.1:8001"; - connector_add_net_port(c, &ports[1], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + addr.port = 8001; + connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_4/bob.c b/examples/pres_4/bob.c index 3bfad00cabdb4434bcdb5b86fb34465c3c22fe5d..b4a68f6aa105657d190919587f15850e8e536fcd 100644 --- a/examples/pres_4/bob.c +++ b/examples/pres_4/bob.c @@ -12,13 +12,11 @@ int main(int argc, char** argv) { // ... with 2 outgoing network connections PortId ports[2]; - char * addr = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); - addr = "127.0.0.1:8001"; - connector_add_net_port(c, &ports[1], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + addr.port = 8001; + connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_5/amy.c b/examples/pres_5/amy.c index 36ed98ce49f3dc39280f1450798bab22362f52fc..9267f73c15ea446c7f8601c011bd86cfa88aa268 100644 --- a/examples/pres_5/amy.c +++ b/examples/pres_5/amy.c @@ -12,13 +12,11 @@ int main(int argc, char** argv) { // ... with 2 outgoing network connections PortId ports[2]; - char * addr = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr, strlen(addr), - Polarity_Putter, EndpointPolarity_Passive); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Putter, EndpointPolarity_Passive); rw_err_peek(c); - addr = "127.0.0.1:8001"; - connector_add_net_port(c, &ports[1], addr, strlen(addr), - Polarity_Putter, EndpointPolarity_Passive); + addr.port = 8001; + connector_add_net_port(c, &ports[1], addr, Polarity_Putter, EndpointPolarity_Passive); rw_err_peek(c); // Connect with peers (5000ms timeout). diff --git a/examples/pres_5/bob.c b/examples/pres_5/bob.c index 41d053143df4bb8c97590e6e0ec7123c9024deea..bfb72d1a090112190f48615cf1c4c455a3014e1e 100644 --- a/examples/pres_5/bob.c +++ b/examples/pres_5/bob.c @@ -13,13 +13,11 @@ int main(int argc, char** argv) { // ... with 2 outgoing network connections PortId ports[4]; - char * addr = "127.0.0.1:8000"; - connector_add_net_port(c, &ports[0], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + FfiSocketAddr addr = {{127,0,0,1}, 8000}; + connector_add_net_port(c, &ports[0], addr, Polarity_Getter, EndpointPolarity_Active); rw_err_peek(c); - addr = "127.0.0.1:8001"; - connector_add_net_port(c, &ports[1], addr, strlen(addr), - Polarity_Getter, EndpointPolarity_Active); + addr.port = 8001; + connector_add_net_port(c, &ports[1], addr, Polarity_Getter, EndpointPolarity_Active); connector_add_port_pair(c, &ports[2], &ports[3]); connector_add_component(c, "alt_round_merger", 16, ports, 3); rw_err_peek(c); diff --git a/reowolf.h b/reowolf.h index 5d353ec55e30b0449e23ad1f4d403172c629fe95..5646ed438f5a698d1bb43ae9e8f9588e2105795e 100644 --- a/reowolf.h +++ b/reowolf.h @@ -8,146 +8,21 @@ #include #include -typedef enum { - EndpointPolarity_Active, - EndpointPolarity_Passive, -} EndpointPolarity; - -typedef enum { - Polarity_Putter, - Polarity_Getter, -} Polarity; - -typedef struct Arc_ProtocolDescription Arc_ProtocolDescription; - -typedef struct Connector Connector; - -typedef int32_t ErrorCode; +#define BAD_FD -5 -typedef uint32_t ConnectorId; +#define CC_MAP_LOCK_POISONED -3 -typedef uint32_t PortSuffix; +#define CLOSE_FAIL -4 -typedef struct { - ConnectorId connector_id; - PortSuffix u32_suffix; -} PortId; +#define CONNECT_FAILED -6 -/** - * Given - * - an initialized connector in setup or connecting state, - * - a string slice for the component's identifier in the connector's configured protocol description, - * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface, - * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C. - * Usable in {setup, communication} states. - */ -ErrorCode connector_add_component(Connector *connector, - const uint8_t *ident_ptr, - uintptr_t ident_len, - const PortId *ports_ptr, - uintptr_t ports_len); +#define ERR_OK 0 -/** - * Given - * - an initialized connector in setup or connecting state, - * - a utf-8 encoded socket address, - * - the logical polarity of P, - * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered, - * returns P, a port newly added to the native interface. - */ -ErrorCode connector_add_net_port(Connector *connector, - PortId *port, - const uint8_t *addr_str_ptr, - uintptr_t addr_str_len, - Polarity port_polarity, - EndpointPolarity endpoint_polarity); +#define ERR_REOWOLF -1 -/** - * Given an initialized connector in setup or connecting state, - * - Creates a new directed port pair with logical channel putter->getter, - * - adds the ports to the native component's interface, - * - and returns them using the given out pointers. - * Usable in {setup, communication} states. - */ -void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter); +#define WOULD_BLOCK -7 -/** - * Connects this connector to the distributed system of connectors reachable through endpoints, - * Usable in setup state, and changes the state to communication. - */ -ErrorCode connector_connect(Connector *connector, int64_t timeout_millis); - -/** - * Destroys the given a pointer to the connector on the heap, freeing its resources. - * Usable in {setup, communication} states. - */ -void connector_destroy(Connector *connector); - -ErrorCode connector_get(Connector *connector, PortId port); - -const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *out_len); - -/** - * Initializes `out` with a new connector using the given protocol description as its configuration. - * The connector uses the given (internal) connector ID. - */ -Connector *connector_new(const Arc_ProtocolDescription *pd); - -Connector *connector_new_logging(const Arc_ProtocolDescription *pd, - const uint8_t *path_ptr, - uintptr_t path_len); - -intptr_t connector_next_batch(Connector *connector); - -void connector_print_debug(Connector *connector); - -/** - * Convenience function combining the functionalities of - * "payload_new" with "connector_put_payload". - */ -ErrorCode connector_put_bytes(Connector *connector, - PortId port, - const uint8_t *bytes_ptr, - uintptr_t bytes_len); - -intptr_t connector_sync(Connector *connector, int64_t timeout_millis); - -/** - * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed. - */ -Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd); - -/** - * Destroys the given initialized protocol description and frees its resources. - */ -void protocol_description_destroy(Arc_ProtocolDescription *pd); - -/** - * Parses the utf8-encoded string slice to initialize a new protocol description object. - * - On success, initializes `out` and returns 0 - * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1 - */ -Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len); - -/** - * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error. - * - pointer is NULL iff there was no last error - * - data at pointer is null-delimited - * - len does NOT include the length of the null-delimiter - * If len is NULL, it will not written to. - */ -const uint8_t *reowolf_error_peek(uintptr_t *len); - -#endif /* REOWOLF_HEADER_DEFINED */ -/* CBindgen generated */ - -#ifndef REOWOLF_HEADER_DEFINED -#define REOWOLF_HEADER_DEFINED - -#include -#include -#include -#include +#define WRONG_STATE -2 typedef enum { EndpointPolarity_Active, @@ -163,336 +38,21 @@ typedef struct Arc_ProtocolDescription Arc_ProtocolDescription; typedef struct Connector Connector; -typedef int ErrorCode; - typedef uint32_t ConnectorId; -typedef uint32_t PortSuffix; +typedef uint32_t U32Suffix; typedef struct { ConnectorId connector_id; - PortSuffix u32_suffix; + U32Suffix u32_suffix; } Id; typedef Id PortId; -/** - * Given - * - an initialized connector in setup or connecting state, - * - a string slice for the component's identifier in the connector's configured protocol description, - * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface, - * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C. - * Usable in {setup, communication} states. - */ -ErrorCode connector_add_component(Connector *connector, - const uint8_t *ident_ptr, - uintptr_t ident_len, - const PortId *ports_ptr, - uintptr_t ports_len); - -/** - * Given - * - an initialized connector in setup or connecting state, - * - a utf-8 encoded socket address, - * - the logical polarity of P, - * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered, - * returns P, a port newly added to the native interface. - */ -ErrorCode connector_add_net_port(Connector *connector, - PortId *port, - const uint8_t *addr_str_ptr, - uintptr_t addr_str_len, - Polarity port_polarity, - EndpointPolarity endpoint_polarity); - -/** - * Given an initialized connector in setup or connecting state, - * - Creates a new directed port pair with logical channel putter->getter, - * - adds the ports to the native component's interface, - * - and returns them using the given out pointers. - * Usable in {setup, communication} states. - */ -void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter); - -/** - * Given - * - an initialized connector in setup or connecting state, - * - a utf-8 encoded BIND socket addresses (i.e., "local"), - * - a utf-8 encoded CONNECT socket addresses (i.e., "peer"), - * returns [P, G] via out pointers [putter, getter], - * - where P is a Putter port that sends messages into the socket - * - where G is a Getter port that recvs messages from the socket - */ -ErrorCode connector_add_udp_port(Connector *connector, - PortId *putter[2], - const uint8_t *local_addr_str_ptr, - uintptr_t local_addr_str_len, - const uint8_t *peer_addr_str_ptr, - uintptr_t peer_addr_str_len); - -/** - * Connects this connector to the distributed system of connectors reachable through endpoints, - * Usable in setup state, and changes the state to communication. - */ -ErrorCode connector_connect(Connector *connector, int64_t timeout_millis); - -/** - * Destroys the given a pointer to the connector on the heap, freeing its resources. - * Usable in {setup, communication} states. - */ -void connector_destroy(Connector *connector); - -ErrorCode connector_get(Connector *connector, PortId port); - -const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *out_len); - -/** - * Initializes `out` with a new connector using the given protocol description as its configuration. - * The connector uses the given (internal) connector ID. - */ -Connector *connector_new(const Arc_ProtocolDescription *pd); - -Connector *connector_new_logging(const Arc_ProtocolDescription *pd, - const uint8_t *path_ptr, - uintptr_t path_len); - -intptr_t connector_next_batch(Connector *connector); - -void connector_print_debug(Connector *connector); - -/** - * Convenience function combining the functionalities of - * "payload_new" with "connector_put_payload". - */ -ErrorCode connector_put_bytes(Connector *connector, - PortId port, - const uint8_t *bytes_ptr, - uintptr_t bytes_len); - -intptr_t connector_sync(Connector *connector, int64_t timeout_millis); - -/** - * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed. - */ -Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd); - -/** - * Destroys the given initialized protocol description and frees its resources. - */ -void protocol_description_destroy(Arc_ProtocolDescription *pd); - -/** - * Parses the utf8-encoded string slice to initialize a new protocol description object. - * - On success, initializes `out` and returns 0 - * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1 - */ -Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len); - -/** - * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error. - * - pointer is NULL iff there was no last error - * - data at pointer is null-delimited - * - len does NOT include the length of the null-delimiter - * If len is NULL, it will not written to. - */ -const uint8_t *reowolf_error_peek(uintptr_t *len); - -#endif /* REOWOLF_HEADER_DEFINED */ -/* CBindgen generated */ - -#ifndef REOWOLF_HEADER_DEFINED -#define REOWOLF_HEADER_DEFINED - -#include -#include -#include -#include - -typedef enum { - EndpointPolarity_Active, - EndpointPolarity_Passive, -} EndpointPolarity; - -typedef enum { - Polarity_Putter, - Polarity_Getter, -} Polarity; - -typedef struct Arc_ProtocolDescription Arc_ProtocolDescription; - -typedef struct Connector Connector; - -typedef int32_t ErrorCode; - -typedef uint32_t ConnectorId; - -typedef uint32_t PortSuffix; - -typedef struct { - ConnectorId connector_id; - PortSuffix u32_suffix; -} Id; - -typedef Id PortId; - -/** - * Given - * - an initialized connector in setup or connecting state, - * - a string slice for the component's identifier in the connector's configured protocol description, - * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface, - * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C. - * Usable in {setup, communication} states. - */ -ErrorCode connector_add_component(Connector *connector, - const uint8_t *ident_ptr, - uintptr_t ident_len, - const PortId *ports_ptr, - uintptr_t ports_len); - -/** - * Given - * - an initialized connector in setup or connecting state, - * - a utf-8 encoded socket address, - * - the logical polarity of P, - * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered, - * returns P, a port newly added to the native interface. - */ -ErrorCode connector_add_net_port(Connector *connector, - PortId *port, - const uint8_t *addr_str_ptr, - uintptr_t addr_str_len, - Polarity port_polarity, - EndpointPolarity endpoint_polarity); - -/** - * Given an initialized connector in setup or connecting state, - * - Creates a new directed port pair with logical channel putter->getter, - * - adds the ports to the native component's interface, - * - and returns them using the given out pointers. - * Usable in {setup, communication} states. - */ -void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter); - -/** - * Given - * - an initialized connector in setup or connecting state, - * - a utf-8 encoded BIND socket addresses (i.e., "local"), - * - a utf-8 encoded CONNECT socket addresses (i.e., "peer"), - * returns [P, G] via out pointers [putter, getter], - * - where P is a Putter port that sends messages into the socket - * - where G is a Getter port that recvs messages from the socket - */ -ErrorCode connector_add_udp_port(Connector *connector, - PortId (*putter)[2], - const uint8_t *local_addr_str_ptr, - uintptr_t local_addr_str_len, - const uint8_t *peer_addr_str_ptr, - uintptr_t peer_addr_str_len); - -/** - * Connects this connector to the distributed system of connectors reachable through endpoints, - * Usable in setup state, and changes the state to communication. - */ -ErrorCode connector_connect(Connector *connector, int64_t timeout_millis); - -/** - * Destroys the given a pointer to the connector on the heap, freeing its resources. - * Usable in {setup, communication} states. - */ -void connector_destroy(Connector *connector); - -ErrorCode connector_get(Connector *connector, PortId port); - -const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *out_len); - -/** - * Initializes `out` with a new connector using the given protocol description as its configuration. - * The connector uses the given (internal) connector ID. - */ -Connector *connector_new(const Arc_ProtocolDescription *pd); - -Connector *connector_new_logging(const Arc_ProtocolDescription *pd, - const uint8_t *path_ptr, - uintptr_t path_len); - -intptr_t connector_next_batch(Connector *connector); - -void connector_print_debug(Connector *connector); - -/** - * Convenience function combining the functionalities of - * "payload_new" with "connector_put_payload". - */ -ErrorCode connector_put_bytes(Connector *connector, - PortId port, - const uint8_t *bytes_ptr, - uintptr_t bytes_len); - -intptr_t connector_sync(Connector *connector, int64_t timeout_millis); - -/** - * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed. - */ -Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd); - -/** - * Destroys the given initialized protocol description and frees its resources. - */ -void protocol_description_destroy(Arc_ProtocolDescription *pd); - -/** - * Parses the utf8-encoded string slice to initialize a new protocol description object. - * - On success, initializes `out` and returns 0 - * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1 - */ -Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len); - -/** - * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error. - * - pointer is NULL iff there was no last error - * - data at pointer is null-delimited - * - len does NOT include the length of the null-delimiter - * If len is NULL, it will not written to. - */ -const uint8_t *reowolf_error_peek(uintptr_t *len); - -#endif /* REOWOLF_HEADER_DEFINED */ -/* CBindgen generated */ - -#ifndef REOWOLF_HEADER_DEFINED -#define REOWOLF_HEADER_DEFINED - -#include -#include -#include -#include - -typedef enum { - EndpointPolarity_Active, - EndpointPolarity_Passive, -} EndpointPolarity; - -typedef enum { - Polarity_Putter, - Polarity_Getter, -} Polarity; - -typedef struct Arc_ProtocolDescription Arc_ProtocolDescription; - -typedef struct Connector Connector; - -typedef int ErrorCode; - -typedef uint32_t ConnectorId; - -typedef uint32_t PortSuffix; - typedef struct { - ConnectorId connector_id; - PortSuffix u32_suffix; -} Id; - -typedef Id PortId; + uint8_t ipv4[4]; + uint16_t port; +} FfiSocketAddr; /** * Given @@ -502,11 +62,11 @@ typedef Id PortId; * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C. * Usable in {setup, communication} states. */ -ErrorCode connector_add_component(Connector *connector, - const uint8_t *ident_ptr, - uintptr_t ident_len, - const PortId *ports_ptr, - uintptr_t ports_len); +int connector_add_component(Connector *connector, + const uint8_t *ident_ptr, + uintptr_t ident_len, + const PortId *ports_ptr, + uintptr_t ports_len); /** * Given @@ -516,12 +76,11 @@ ErrorCode connector_add_component(Connector *connector, * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered, * returns P, a port newly added to the native interface. */ -ErrorCode connector_add_net_port(Connector *connector, - PortId *port, - const uint8_t *addr_str_ptr, - uintptr_t addr_str_len, - Polarity port_polarity, - EndpointPolarity endpoint_polarity); +int connector_add_net_port(Connector *connector, + PortId *port, + FfiSocketAddr addr, + Polarity port_polarity, + EndpointPolarity endpoint_polarity); /** * Given an initialized connector in setup or connecting state, @@ -541,19 +100,17 @@ void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *o * - where P is a Putter port that sends messages into the socket * - where G is a Getter port that recvs messages from the socket */ -ErrorCode connector_add_udp_port(Connector *connector, - PortId *putter, - PortId *getter, - const uint8_t *local_addr_str_ptr, - uintptr_t local_addr_str_len, - const uint8_t *peer_addr_str_ptr, - uintptr_t peer_addr_str_len); +int connector_add_udp_port_pair(Connector *connector, + PortId *putter, + PortId *getter, + FfiSocketAddr local_addr, + FfiSocketAddr peer_addr); /** * Connects this connector to the distributed system of connectors reachable through endpoints, * Usable in setup state, and changes the state to communication. */ -ErrorCode connector_connect(Connector *connector, int64_t timeout_millis); +int connector_connect(Connector *connector, int64_t timeout_millis); /** * Destroys the given a pointer to the connector on the heap, freeing its resources. @@ -561,7 +118,7 @@ ErrorCode connector_connect(Connector *connector, int64_t timeout_millis); */ void connector_destroy(Connector *connector); -ErrorCode connector_get(Connector *connector, PortId port); +int connector_get(Connector *connector, PortId port); const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *out_len); @@ -583,10 +140,10 @@ void connector_print_debug(Connector *connector); * Convenience function combining the functionalities of * "payload_new" with "connector_put_payload". */ -ErrorCode connector_put_bytes(Connector *connector, - PortId port, - const uint8_t *bytes_ptr, - uintptr_t bytes_len); +int connector_put_bytes(Connector *connector, + PortId port, + const uint8_t *bytes_ptr, + uintptr_t bytes_len); intptr_t connector_sync(Connector *connector, int64_t timeout_millis); diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index e2537e70ec313febddeab3fcf730add026cfc155..4e73ff40eed1c71c33681377f3708b34eb2067c2 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,8 +3,21 @@ use core::{cell::RefCell, convert::TryFrom}; use std::os::raw::c_int; use std::slice::from_raw_parts as slice_from_raw_parts; -#[cfg(feature = "ffi_socket_api")] -pub mod socket_api; +// #[cfg(feature = "ffi_pseudo_socket_api")] +// pub mod pseudo_socket_api; + +// Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages +#[repr(C)] +pub struct FfiSocketAddr { + pub ipv4: [u8; 4], + pub port: u16, +} +impl Into for FfiSocketAddr { + fn into(self) -> SocketAddr { + (self.ipv4, self.port).into() + } +} + /////////////////////////////////////////////// #[derive(Default)] struct StoredError { @@ -81,10 +94,11 @@ unsafe fn tl_socketaddr_from_raw( pub const ERR_OK: c_int = 0; pub const ERR_REOWOLF: c_int = -1; pub const WRONG_STATE: c_int = -2; -pub const FD_LOCK_POISONED: c_int = -3; +pub const CC_MAP_LOCK_POISONED: c_int = -3; pub const CLOSE_FAIL: c_int = -4; pub const BAD_FD: c_int = -5; pub const CONNECT_FAILED: c_int = -6; +pub const WOULD_BLOCK: c_int = -7; ///////////////////// REOWOLF ////////////////////////// @@ -240,17 +254,12 @@ pub unsafe extern "C" fn connector_add_component( pub unsafe extern "C" fn connector_add_net_port( connector: &mut Connector, port: *mut PortId, - addr_str_ptr: *const u8, - addr_str_len: usize, + addr: FfiSocketAddr, port_polarity: Polarity, endpoint_polarity: EndpointPolarity, ) -> c_int { StoredError::tl_clear(); - let addr = match tl_socketaddr_from_raw(addr_str_ptr, addr_str_len) { - Ok(local) => local, - Err(errcode) => return errcode, - }; - match connector.new_net_port(port_polarity, addr, endpoint_polarity) { + match connector.new_net_port(port_polarity, addr.into(), endpoint_polarity) { Ok(p) => { if !port.is_null() { port.write(p); @@ -276,21 +285,11 @@ pub unsafe extern "C" fn connector_add_udp_port_pair( connector: &mut Connector, putter: *mut PortId, getter: *mut PortId, - local_addr_str_ptr: *const u8, - local_addr_str_len: usize, - peer_addr_str_ptr: *const u8, - peer_addr_str_len: usize, + local_addr: FfiSocketAddr, + peer_addr: FfiSocketAddr, ) -> c_int { StoredError::tl_clear(); - let local = match tl_socketaddr_from_raw(local_addr_str_ptr, local_addr_str_len) { - Ok(local) => local, - Err(errcode) => return errcode, - }; - let peer = match tl_socketaddr_from_raw(peer_addr_str_ptr, peer_addr_str_len) { - Ok(local) => local, - Err(errcode) => return errcode, - }; - match connector.new_udp_mediator_component(local, peer) { + match connector.new_udp_mediator_component(local_addr.into(), peer_addr.into()) { Ok([p, g]) => { if !putter.is_null() { putter.write(p); diff --git a/src/ffi/socket_api.rs b/src/ffi/pseudo_socket_api.rs similarity index 65% rename from src/ffi/socket_api.rs rename to src/ffi/pseudo_socket_api.rs index 6d56d8e4dfd74dba4de74501422e561f8923d22d..1f0c2420325a69e3589d0480e4727376bb17857b 100644 --- a/src/ffi/socket_api.rs +++ b/src/ffi/pseudo_socket_api.rs @@ -15,10 +15,11 @@ struct FdAllocator { } struct ConnectorBound { connector: Connector, + is_nonblocking: bool, putter: PortId, getter: PortId, } -struct MaybeConnector { +struct ConnectorComplex { // invariants: // 1. connector is a upd-socket singleton // 2. putter and getter are ports in the native interface with the appropriate polarities @@ -27,8 +28,8 @@ struct MaybeConnector { connector_bound: Option, } #[derive(Default)] -struct FdcStorage { - fd_to_c: HashMap>, +struct CcMap { + fd_to_cc: HashMap>, fd_allocator: FdAllocator, } fn trivial_peer_addr() -> SocketAddr { @@ -61,9 +62,9 @@ impl FdAllocator { } } lazy_static::lazy_static! { - static ref FDC_STORAGE: RwLock = Default::default(); + static ref CC_MAP: RwLock = Default::default(); } -impl MaybeConnector { +impl ConnectorComplex { fn connect(&mut self, peer_addr: SocketAddr) -> c_int { self.peer_addr = peer_addr; if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound { @@ -105,23 +106,24 @@ impl MaybeConnector { } } } + /////////////////////////////////////////////////////////////////// #[no_mangle] pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { // ignoring domain and type - let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED }; + let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; let fd = w.fd_allocator.alloc(); - let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None }; - w.fd_to_c.insert(fd, RwLock::new(mc)); + let cc = ConnectorComplex { peer_addr: trivial_peer_addr(), connector_bound: None }; + w.fd_to_cc.insert(fd, RwLock::new(cc)); fd } #[no_mangle] pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW - let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED }; - if w.fd_to_c.remove(&fd).is_some() { + let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; + if w.fd_to_cc.remove(&fd).is_some() { w.fd_allocator.free(fd); ERR_OK } else { @@ -136,22 +138,22 @@ pub unsafe extern "C" fn rw_bind( _addr_len: usize, ) -> c_int { // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED }; - let mc: &mut MaybeConnector = &mut mc; - if mc.connector_bound.is_some() { + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; + let cc: &mut ConnectorComplex = &mut cc; + if cc.connector_bound.is_some() { return WRONG_STATE; } - mc.connector_bound = { + cc.connector_bound = { let mut connector = Connector::new( Box::new(crate::DummyLogger), crate::TRIVIAL_PD.clone(), Connector::random_id(), ); let [putter, getter] = - connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap(); - Some(ConnectorBound { connector, putter, getter }) + connector.new_udp_mediator_component(local_addr.read(), cc.peer_addr).unwrap(); + Some(ConnectorBound { connector, putter, getter, is_nonblocking: false }) }; ERR_OK } @@ -163,11 +165,11 @@ pub unsafe extern "C" fn rw_connect( _address_len: usize, ) -> c_int { // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED }; - let mc: &mut MaybeConnector = &mut mc; - mc.connect(peer_addr.read()) + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; + let cc: &mut ConnectorComplex = &mut cc; + cc.connect(peer_addr.read()) } #[no_mangle] @@ -178,11 +180,11 @@ pub unsafe extern "C" fn rw_send( _flags: c_int, ) -> isize { // ignoring flags - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; - let mc: &mut MaybeConnector = &mut mc; - mc.send(bytes_ptr, bytes_len) + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let cc: &mut ConnectorComplex = &mut cc; + cc.send(bytes_ptr, bytes_len) } #[no_mangle] @@ -193,11 +195,11 @@ pub unsafe extern "C" fn rw_recv( _flags: c_int, ) -> isize { // ignoring flags - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; - let mc: &mut MaybeConnector = &mut mc; - mc.recv(bytes_ptr, bytes_len) + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let cc: &mut ConnectorComplex = &mut cc; + cc.recv(bytes_ptr, bytes_len) } #[no_mangle] @@ -209,21 +211,21 @@ pub unsafe extern "C" fn rw_sendto( peer_addr: *const SocketAddr, _addr_len: usize, ) -> isize { - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; - let mc: &mut MaybeConnector = &mut mc; + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let cc: &mut ConnectorComplex = &mut cc; // copy currently connected peer addr - let connected = mc.peer_addr; + let connected = cc.peer_addr; // connect to given peer_addr - match mc.connect(peer_addr.read()) { + match cc.connect(peer_addr.read()) { e if e != ERR_OK => return e as isize, _ => {} } // send - let ret = mc.send(bytes_ptr, bytes_len); + let ret = cc.send(bytes_ptr, bytes_len); // restore connected peer addr - match mc.connect(connected) { + match cc.connect(connected) { e if e != ERR_OK => return e as isize, _ => {} } @@ -240,21 +242,21 @@ pub unsafe extern "C" fn rw_recvfrom( peer_addr: *const SocketAddr, _addr_len: usize, ) -> isize { - let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; - let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; - let mc: &mut MaybeConnector = &mut mc; + let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let cc: &mut ConnectorComplex = &mut cc; // copy currently connected peer addr - let connected = mc.peer_addr; + let connected = cc.peer_addr; // connect to given peer_addr - match mc.connect(peer_addr.read()) { + match cc.connect(peer_addr.read()) { e if e != ERR_OK => return e as isize, _ => {} } // send - let ret = mc.send(bytes_ptr, bytes_len); + let ret = cc.send(bytes_ptr, bytes_len); // restore connected peer addr - match mc.connect(connected) { + match cc.connect(connected) { e if e != ERR_OK => return e as isize, _ => {} } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index cbda5c50e76d2932dfa6c187ce5aafa48f2a84d1..83e5ede6e7378bc849393b85fe82ca92a76cf8cf 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -364,8 +364,7 @@ impl Connector { // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); - comm.endpoint_manager - .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream); + comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.inner.logger); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 50dab213add6688c1e367ef99fa2954af8f2c65d..f3149c2a91f75540634698df841a26d35172af13 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -135,7 +135,7 @@ impl EndpointManager { return Ok(tup); } // poll if time remains - self.poll_and_polulate(logger, deadline)?; + self.poll_and_populate(logger, deadline)?; } } @@ -237,26 +237,26 @@ impl EndpointManager { if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); - let payload = Payload::from(&recv_buffer[..bytes_written]); - let [branch_spec_var, port_spec_var] = [ - ee.incoming_round_spec_var.unwrap(), // should not be NONE - port_info.spec_var_for(ee.getter_for_incoming), - ]; - let branch_spec_val = SpecVal::nth_domain_element(ee.incoming_payloads.len()); - ee.incoming_payloads.push(payload.clone()); - let predicate = Predicate::default() - .inserted(branch_spec_var, branch_spec_val) - .inserted(port_spec_var, SpecVal::FIRING); - round_ctx - .getter_add(ee.getter_for_incoming, SendPayloadMsg { payload, predicate }); - some_message_enqueued = true; + if !ee.received_this_round { + let payload = Payload::from(&recv_buffer[..bytes_written]); + let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); + let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); + round_ctx.getter_add( + ee.getter_for_incoming, + SendPayloadMsg { payload, predicate }, + ); + some_message_enqueued = true; + ee.received_this_round = true; + } else { + // lose the message! + } } } if some_message_enqueued { return Ok(CommRecvOk::NewPayloadMsgs); } // poll if time remains - match self.poll_and_polulate(logger, round_ctx.get_deadline()) { + match self.poll_and_populate(logger, round_ctx.get_deadline()) { Ok(()) => {} // continue looping Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), Err(Pape::PollFailed) => return Err(Use::PollFailed), @@ -283,7 +283,7 @@ impl EndpointManager { } Ok(None) } - fn poll_and_polulate( + fn poll_and_populate( &mut self, logger: &mut dyn Logger, deadline: &Option, @@ -336,20 +336,14 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } - pub(super) fn udp_endpoints_round_start( - &mut self, - logger: &mut dyn Logger, - spec_var_stream: &mut SpecVarStream, - ) { + pub(super) fn udp_endpoints_round_start(&mut self, logger: &mut dyn Logger) { log!( logger, "Starting round for {} udp endpoints", self.udp_endpoint_store.endpoint_exts.len() ); - for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { - let spec_var = spec_var_stream.next(); - log!(logger, "Udp endpoint given {} spec var {:?} for this round", index, spec_var); - ee.incoming_round_spec_var = Some(spec_var); + for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() { + ee.received_this_round = false; } } pub(super) fn udp_endpoints_round_end( @@ -367,7 +361,6 @@ impl EndpointManager { 'endpoint_loop: for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { - ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity. for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { ee.sock.send(payload.as_slice()).map_err(|e| { @@ -390,26 +383,6 @@ impl EndpointManager { Ok(()) } } -// impl UdpEndpointExt { -// fn try_recv( -// &mut self, -// port_info: &PortInfo, -// udp_in_buffer: &mut UdpInBuffer, -// ) -> Option { -// let recv_buffer = udp_in_buffer.as_mut_slice(); -// let len = self.sock.recv(recv_buffer).ok()?; -// let payload = Payload::from(&recv_buffer[..len]); -// let branch_spec_var = self -// .incoming_round_spec_var -// .expect("Udp spec var should be Some(..) if recv() is called"); -// let branch_spec_val = SpecVal::nth_domain_element(self.incoming_payloads.len()); -// self.incoming_payloads.push(payload.clone()); -// let predicate = Predicate::default() -// .inserted(branch_spec_var, branch_spec_val) -// .inserted(port_info.spec_var_for(self.getter_for_incoming), SpecVal::FIRING); -// Some(SendPayloadMsg { payload, predicate }) -// } -// } impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index a092d43973bab8c7e86b93ef52bfcb0ac67c3954..70d2abeb9c473ae97ea2adc2291709d10f620a06 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -209,10 +209,9 @@ struct EndpointStore { #[derive(Debug)] struct UdpEndpointExt { sock: UdpSocket, // already bound and connected + received_this_round: bool, outgoing_payloads: HashMap, - incoming_round_spec_var: Option, getter_for_incoming: PortId, - incoming_payloads: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { @@ -456,6 +455,10 @@ impl Connector { } } impl Predicate { + #[inline] + pub fn singleton(k: SpecVar, v: SpecVal) -> Self { + Self::default().inserted(k, v) + } #[inline] pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { self.assigned.insert(k, v); diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index b3967f13704811370ed5a7dc8dd174a962cd7228..476bdb1ca51f259bf91cbcf905ff6edcc70109d4 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -511,9 +511,8 @@ fn new_endpoint_manager( UdpEndpointExt { sock, outgoing_payloads: Default::default(), - incoming_round_spec_var: None, + received_this_round: false, getter_for_incoming, - incoming_payloads: Default::default(), } }) .collect();