Changeset - 9ed6d091a817
[Not reviewed]
0 6 17
Christopher Esterhuyse - 5 years ago 2020-10-01 17:47:10
christopher.esterhuyse@gmail.com
more benchmarks and some more error cases
23 files changed with 359 insertions and 17 deletions:
0 comments (0 inline, 0 general)
examples/bench_11/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j, forwards, num_options, correct_index;
 
	forwards = atoi(argv[1]);
 
	num_options = atoi(argv[2]);
 
	correct_index = atoi(argv[3]);
 
	printf("forwards %d, num_options %d, correct_index %d\n",
 
		forwards, num_options, correct_index);
 
	printf("forwards %d, num_options %d\n",
 
		forwards, num_options);
 
	unsigned char pdl[] = 
 
	"primitive recv_zero(in a) {  "
 
	"    while(true) synchronous {"
 
	"        msg m = get(a);      "
 
	"        assert(m[0] == 0);   "
 
	"    }                        "
 
	"}                            "
 
	; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_11.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		// create a forward to tail of chain
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		// thread a forward component onto native_tail
 
		char ident[] = "forward";
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	// add "recv_zero" on end of chain
 
	char ident[] = "recv_zero";
 
	connector_add_component(c, ident, sizeof(ident)-1, &native_getter, 1);
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	char msg = 0;
 
	for (i=0; i<10000; i++) {
 
	for (i=0; i<1000; i++) {
 
		correct_index = i%num_options;
 
		for(j=0; j<num_options; j++) {
 
			msg = j==correct_index ? 0 : 1;
 
			connector_put_bytes(c, native_putter, &msg, 1);
 
			if(j+1 < num_options) {
 
				connector_next_batch(c);
 
			}
 
		}	
 
		connector_sync(c, -1);	
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_12/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j, batches;
 
	batches = atoi(argv[1]);
 
	printf("batches %d\n", batches);
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_12.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	char msg = 0;
 
	for (i=0; i<1000000; i++) {
 
		for(j=1; j<batches; j++) {
 
			connector_next_batch(c);
 
		}
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_13/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, msglen, inside, total;
 
	char * transport;
 
	transport = argv[1];
 
	msglen = atoi(argv[2]);
 
	inside = atoi(argv[3]);
 
	total = atoi(argv[4]);
 
	printf("transport `%s`, msglen %d, inside %d, total %d\n",
 
		transport, msglen, inside, total);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_13.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	char ident[] = "sync"; // defined in reowolf's stdlib 
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<inside; i++) {
 
		// create a forward linked in the ring
 
		PortId putter, getter;
 
		if(0 == strcmp(transport, "mem")) {
 
			connector_add_port_pair(c, &putter, &getter);
 
		} else if(0 == strcmp(transport, "localhost")) {
 
			FfiSocketAddr addr = {{127, 0, 0, 1}, i+7000};
 
			connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active);
 
			connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive);
 
		} else {
 
			printf("BAD TRANSPORT!\n");
 
			exit(1);
 
		}
 
		
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	for (i=inside; i<total; i++) {
 
		// create a forward linked to itself
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){getter, putter}, 2);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	char * msg = malloc(msglen);
 
	memset(msg, msglen, 42);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<100000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msglen);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_14/amy.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, msglen;
 
	msglen = atoi(argv[1]);
 
	printf("msglen %d\n", msglen);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char * msg = malloc(msglen);
 
	memset(msg, msglen, 42);
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_14_amy.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0);
 
	
 
	PortId putter, getter;
 
	connector_add_net_port(
 
		c,
 
		&putter, 
 
		(FfiSocketAddr) {{127, 0, 0, 1}, 7000},
 
		Polarity_Putter,
 
		EndpointPolarity_Active);
 
	connector_add_net_port(
 
		c,
 
		&getter, 
 
		(FfiSocketAddr) {{127, 0, 0, 1}, 7001},
 
		Polarity_Getter,
 
		EndpointPolarity_Passive);
 
	connector_connect(c, -1);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
		connector_put_bytes(c, putter, msg, msglen);
 
		connector_get(c, getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 

	
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_14/bob.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i;
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_14_bob.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1);
 
	
 
	PortId putter, getter;
 
	connector_add_net_port(
 
		c,
 
		&putter, 
 
		(FfiSocketAddr) {{127, 0, 0, 1}, 7001},
 
		Polarity_Putter,
 
		EndpointPolarity_Active);
 
	connector_add_net_port(
 
		c,
 
		&getter, 
 
		(FfiSocketAddr) {{127, 0, 0, 1}, 7000},
 
		Polarity_Getter,
 
		EndpointPolarity_Passive);
 
	connector_add_component(c, "forward", 7, (PortId[]){getter, putter}, 2);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 

	
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_15/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, cid;
 
	cid = atoi(argv[1]);
 
	printf("cid %d\n", cid);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 

	
 
	bool seen_delim = false;
 
	for(i=2; i<argc; i++) {
 
		EndpointPolarity ep;
 
		Polarity p;
 
		if(argv[i][0] == '.') {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			printf("putter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			printf("getter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
		FfiSocketAddr addr = {{127, 0, 0, 1}, atoi(argv[i])};
 
		printf("@%d\n", addr.port);
 
		connector_add_net_port(c, NULL, addr, p, ep);
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_15_7000.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7993.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7994.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7995.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7996.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7997.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7998.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_7999.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_8000.png
Show inline comments
 
new file 100644
Show images
examples/bench_15_8001.png
Show inline comments
 
new file 100644
Show images
examples/bench_16/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, cid;
 
	cid = atoi(argv[1]);
 
	printf("cid %d\n", cid);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 

	
 
	bool seen_delim = false;
 
	for(i=2; i<argc; i++) {
 
		EndpointPolarity ep;
 
		Polarity p;
 
		if(argv[i][0] == '.') {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			printf("putter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			printf("getter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
		FfiSocketAddr addr = {{127, 0, 0, 1}, atoi(argv[i])};
 
		printf("@%d\n", addr.port);
 
		connector_add_net_port(c, NULL, addr, p, ep);
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_17/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
#define N 5
 
int main(int argc, char** argv) {
 
	int i, cid, min_pid, msgs;
 
	cid = atoi(argv[1]);
 
	min_pid = atoi(argv[2]);
 
	char role = argv[3][0]; // 'h' for head, 'i' for inner, 't' for tail, 's' for singleton
 
	msgs = atoi(argv[4]);
 
	printf("cid %d, min_pid %d, role='%c', msgs %d\n",
 
		cid, min_pid, role, msgs);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId putters[N], getters[N];
 
	FfiSocketAddr addr = {{127, 0, 0, 1}, 0};
 
	if(role=='i' || role=='t') {
 
		// I have N getter ports!
 
		for(i=0; i<N; i++) {
 
			addr.port = min_pid+i;
 
			connector_add_net_port(c, &getters[i], addr, Polarity_Getter, EndpointPolarity_Passive);
 
		}
 
	}
 
	if(role=='h' || role=='i') {
 
		// I have N putter ports!
 
		for(i=0; i<N; i++) {
 
			addr.port = min_pid+i+N;
 
			connector_add_net_port(c, &putters[i], addr, Polarity_Putter, EndpointPolarity_Active);
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	if(role=='i') {
 
		// Inner has a forwarder component to forward messages
 
		for(i=0; i<N; i++) {	
 
			connector_add_component(c, "forward", 7, (PortId[]){putters[i], getters[i]}, 2);
 
		}
 
	}
 
	connector_connect(c, -1);
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<10000; i++) {
 
		if(role=='h' || role=='s') {
 
			// singleton and head send N messages
 
			for(i=0; i<N; i++) { 
 
				connector_put_bytes(c, putters[i], msg, sizeof(msg)-1);
 
			}
 
		}
 
		if(role=='t' || role=='s') {
 
			// singleton and tail recv N messages
 
			for(i=0; i<N; i++) { 
 
				connector_get(c, getters[i]);
 
			}
 
		}
 
		// inner doesn't send nor receive
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/zoop.sh
Show inline comments
 
#!/bin/bash
 
for options in {1..5}
 
for included in {0..13}
 
do
 
	for forwards in {0..15}
 
	do
 
		./bench_11/main.exe $forwards $options 0
 
	done
 
	./bench_13/main.exe 65535 $included 13
 
done
 
\ No newline at end of file
reowolf.h
Show inline comments
 
@@ -84,96 +84,98 @@ int connector_add_net_port(Connector *connector,
 
                           Polarity port_polarity,
 
                           EndpointPolarity endpoint_polarity);
 

	
 
/**
 
 * Given an initialized connector in setup or connecting state,
 
 * - Creates a new directed port pair with logical channel putter->getter,
 
 * - adds the ports to the native component's interface,
 
 * - and returns them using the given out pointers.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter);
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
 * - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
 * returns [P, G] via out pointers [putter, getter],
 
 * - where P is a Putter port that sends messages into the socket
 
 * - where G is a Getter port that recvs messages from the socket
 
 */
 
int connector_add_udp_mediator_component(Connector *connector,
 
                                         PortId *putter,
 
                                         PortId *getter,
 
                                         FfiSocketAddr local_addr,
 
                                         FfiSocketAddr peer_addr);
 

	
 
/**
 
 * Connects this connector to the distributed system of connectors reachable through endpoints,
 
 * Usable in setup state, and changes the state to communication.
 
 */
 
int connector_connect(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Destroys the given a pointer to the connector on the heap, freeing its resources.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_destroy(Connector *connector);
 

	
 
int connector_get(Connector *connector, PortId port);
 

	
 
const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *out_len);
 

	
 
/**
 
 * Initializes `out` with a new connector using the given protocol description as its configuration.
 
 * The connector uses the given (internal) connector ID.
 
 */
 
Connector *connector_new(const Arc_ProtocolDescription *pd);
 
Connector *connector_new_with_id(const Arc_ProtocolDescription *pd,
 
                                 ConnectorId connector_id);
 

	
 
Connector *connector_new_logging(const Arc_ProtocolDescription *pd,
 
                                 const uint8_t *path_ptr,
 
                                 uintptr_t path_len);
 

	
 
Connector *connector_new_logging_with_id(const Arc_ProtocolDescription *pd,
 
                                         const uint8_t *path_ptr,
 
                                         uintptr_t path_len,
 
                                         ConnectorId connector_id);
 

	
 
intptr_t connector_next_batch(Connector *connector);
 

	
 
void connector_print_debug(Connector *connector);
 

	
 
/**
 
 * Convenience function combining the functionalities of
 
 * "payload_new" with "connector_put_payload".
 
 */
 
int connector_put_bytes(Connector *connector,
 
                        PortId port,
 
                        const uint8_t *bytes_ptr,
 
                        uintptr_t bytes_len);
 

	
 
intptr_t connector_sync(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
 */
 
Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Destroys the given initialized protocol description and frees its resources.
 
 */
 
void protocol_description_destroy(Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Parses the utf8-encoded string slice to initialize a new protocol description object.
 
 * - On success, initializes `out` and returns 0
 
 * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
 */
 
Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len);
 

	
 
/**
 
 * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
 * - pointer is NULL iff there was no last error
 
 * - data at pointer is null-delimited
 
 * - len does NOT include the length of the null-delimiter
 
 * If len is NULL, it will not written to.
src/ffi/mod.rs
Show inline comments
 
@@ -123,124 +123,131 @@ pub unsafe extern "C" fn protocol_description_parse(
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> *mut Arc<ProtocolDescription> {
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), connector_id);
 
    Box::into_raw(Box::new(c))
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id());
 
    Box::into_raw(Box::new(c))
 
    connector_new_with_id(pd, Connector::random_id())
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    if !out_putter.is_null() {
 
        out_putter.write(o);
 
    }
 
    if !out_getter.is_null() {
 
        out_getter.write(i);
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_raw_parts(ident_ptr, ident_len),
 
        &*slice_from_raw_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Given
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    UdpConnectFailed(SocketAddr),
 
    TcpInvalidConnect(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    NetEndpointSetupError(SocketAddr, NetEndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    DuplicatePort(PortId),
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenNetEndpoint { index: usize },
 
    BrokenUdpEndpoint { index: usize },
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ComponentId),
 
    RoundFailure,
 
    Unrecoverable(UnrecoverableSyncError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum NetEndpointError {
 
    MalformedMessage,
 
    BrokenNetEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
src/runtime/setup.rs
Show inline comments
 
@@ -282,97 +282,97 @@ fn setup_endpoints_and_pair_ports(
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                    .map_err(|_| Ce::TcpInvalidConnect(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the timeout for the poll call
 
        let remaining = match (deadline, net_connect_to_retry.is_empty()) {
 
            (None, true) => None,
 
            (None, false) => Some(RETRY_PERIOD),
 
            (Some(deadline), is_empty) => {
 
                let dur_to_timeout =
0 comments (0 inline, 0 general)