Changeset - d5c0b4fdc19b
[Not reviewed]
0 6 5
Christopher Esterhuyse - 5 years ago 2020-09-29 17:14:41
christopher.esterhuyse@gmail.com
more robust test dir creation
11 files changed with 232 insertions and 4 deletions:
0 comments (0 inline, 0 general)
examples/bench_4/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		connector_add_component(c, "trivial_loop", 12, NULL, 0);
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_5/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, port_pairs, proto_components;
 
	port_pairs = atoi(argv[1]);
 
	proto_components = atoi(argv[2]);
 
	printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_5.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<port_pairs; i++) {
 
		connector_add_port_pair(c, NULL, NULL);
 
	}
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_6/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, self_syncs;
 
	self_syncs = atoi(argv[1]);
 
	printf("self_syncs %d\n", self_syncs);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_6.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<self_syncs; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		char ident[] = "sync"; // defined in reowolf's stdlib 
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){getter, putter}, 2);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_7/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_7.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		char ident[] = "forward"; // defined in reowolf's stdlib 
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		char msg[] = "Hello, world!";
 
		connector_put_bytes(c, native_putter, msg, sizeof(msg)-1);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_8/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_8.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		char ident[] = "forward"; // defined in reowolf's stdlib 
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	size_t msg_len = 0xffff;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msg_len);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/zoop.sh
Show inline comments
 
new file 100644
 
#!/bin/bash
 
for syncs in {0..16}
 
do
 
	./bench_8/main.exe $syncs
 
done
 
\ No newline at end of file
src/ffi/mod.rs
Show inline comments
 
@@ -19,387 +19,391 @@ impl Into<SocketAddr> for FfiSocketAddr {
 
    }
 
}
 

	
 
///////////////////////////////////////////////
 
#[derive(Default)]
 
struct StoredError {
 
    // invariant: len is zero IFF its occupied
 
    // contents are 1+ bytes because we also store the NULL TERMINATOR
 
    buf: Vec<u8>,
 
}
 
impl StoredError {
 
    const NULL_TERMINATOR: u8 = 0;
 
    fn clear(&mut self) {
 
        // no null terminator either!
 
        self.buf.clear();
 
    }
 
    fn debug_store<E: Debug>(&mut self, error: &E) {
 
        let _ = write!(&mut self.buf, "{:?}", error);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_debug_store<E: Debug>(error: &E) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.debug_store(error);
 
        })
 
    }
 
    fn bytes_store(&mut self, bytes: &[u8]) {
 
        let _ = self.buf.write_all(bytes);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_bytes_store(bytes: &[u8]) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.bytes_store(bytes);
 
        })
 
    }
 
    fn tl_clear() {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
        })
 
    }
 
    fn tl_bytes_peek() -> (*const u8, usize) {
 
        STORED_ERROR.with(|stored_error| {
 
            let stored_error = stored_error.borrow();
 
            match stored_error.buf.len() {
 
                0 => (core::ptr::null(), 0), // no error!
 
                n => {
 
                    // stores an error of length n-1 AND a NULL TERMINATOR
 
                    (stored_error.buf.as_ptr(), n - 1)
 
                }
 
            }
 
        })
 
    }
 
}
 
thread_local! {
 
    static STORED_ERROR: RefCell<StoredError> = RefCell::new(StoredError::default());
 
}
 

	
 
pub const RW_OK: c_int = 0;
 
pub const RW_TL_ERR: c_int = -1;
 
pub const RW_WRONG_STATE: c_int = -2;
 
pub const RW_LOCK_POISONED: c_int = -3;
 
pub const RW_CLOSE_FAIL: c_int = -4;
 
pub const RW_BAD_FD: c_int = -5;
 
pub const RW_CONNECT_FAILED: c_int = -6;
 
pub const RW_WOULD_BLOCK: c_int = -7;
 
pub const RW_BAD_SOCKADDR: c_int = -8;
 

	
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
/// - data at pointer is null-delimited
 
/// - len does NOT include the length of the null-delimiter
 
/// If len is NULL, it will not written to.
 
#[no_mangle]
 
pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 {
 
    let (err_ptr, err_len) = StoredError::tl_bytes_peek();
 
    if !len.is_null() {
 
        len.write(err_len);
 
    }
 
    err_ptr
 
}
 

	
 
///////////////////// PROTOCOL DESCRIPTION //////////////////////////
 

	
 
/// Parses the utf8-encoded string slice to initialize a new protocol description object.
 
/// - On success, initializes `out` and returns 0
 
/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_parse(
 
    pdl: *const u8,
 
    pdl_len: usize,
 
) -> *mut Arc<ProtocolDescription> {
 
    StoredError::tl_clear();
 
    match ProtocolDescription::parse(&*slice_from_raw_parts(pdl, pdl_len)) {
 
        Ok(new) => Box::into_raw(Box::new(Arc::new(new))),
 
        Err(err) => {
 
            StoredError::tl_bytes_store(err.as_bytes());
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> *mut Arc<ProtocolDescription> {
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id());
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    if !out_putter.is_null() {
 
        out_putter.write(o);
 
    }
 
    if !out_getter.is_null() {
 
        out_getter.write(i);
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_raw_parts(ident_ptr, ident_len),
 
        &*slice_from_raw_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    port: *mut PortId,
 
    addr: FfiSocketAddr,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_net_port(port_polarity, addr.into(), endpoint_polarity) {
 
        Ok(p) => {
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            RW_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_mediator_component(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr: FfiSocketAddr,
 
    peer_addr: FfiSocketAddr,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_udp_mediator_component(local_addr.into(), peer_addr.into()) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
 
            }
 
            if !getter.is_null() {
 
                getter.write(g);
 
            }
 
            RW_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: *mut Payload,
 
// ) -> c_int {
 
//     match connector.put(port, payload.read()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload_cloning(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: &Payload,
 
// ) -> c_int {
 
//     match connector.put(port, payload.clone()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
    match connector.put(port, Payload::from(bytes)) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.get(port) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize {
 
    StoredError::tl_clear();
 
    match connector.next_batch() {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR as isize
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.sync(timeout) {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
src/runtime/communication.rs
Show inline comments
 
@@ -1191,298 +1191,308 @@ impl BranchingProtoComponent {
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys());
 
        Ok(())
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
                old.ended |= branch.ended;
 
            }
 
        }
 
    }
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return state;
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl ProtoComponentBranch {
 
    // Feed this branch received message.
 
    // It's safe to receive the same message repeatedly,
 
    // but if we receive a message with different contents,
 
    // it's a sign something has gone wrong! keys of type (port, round, predicate)
 
    // should always map to at most one message value!
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let e = self.inner.inbox.entry(getter);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // new message
 
                ev.insert(payload);
 
            }
 
            Entry::Occupied(eo) => {
 
                // redundant recv. can happen as a result of a
 
                // component A having two branches X and Y related by
 
                assert_eq!(eo.get(), &payload);
 
            }
 
        }
 
    }
 
}
 
impl SolutionStorage {
 
    // Create a new solution storage, to manage the local solutions for
 
    // this connector and all of it's children (subtrees) in the solution tree.
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        // For easy iteration, we store this SubtreeId => {Predicate}
 
        // structure instead as a pair of structures: a vector of predicate sets,
 
        // and a subtree_id-to-index lookup map
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        // new_local U old_local represents the solutions of this connector itself:
 
        // namely, those that can be created from the union of one element from each child's solution set.
 
        // The difference between new and old is that new stores those NOT YET sent over the network
 
        // to this connector's parent in the solution tree.
 
        // invariant: old_local and new_local have an empty intersection
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    // drain old_local to new_local, visiting all new additions to old_local
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            // rely on invariant: empty intersection between old and new local sets
 
            assert!(old_local.insert(local.clone()));
 
            local
 
        })
 
    }
 
    // insert a solution for the given subtree ID,
 
    // AND update new_local to include any solutions that become 
 
    // possible as a result of this new addition  
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self;
 
        let index = subtree_id_to_index[&subtree_id];
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            // This is a newly-added solution! update new_local
 
            // consider ALL consistent combinations of one element from each solution set
 
            // to our right or left in the solution-set vector
 
            // but with THIS PARTICULAR predicate from our own index.
 
            let left = 0..index;
 
            let right = (index + 1)..subtree_solutions.len();
 
            // iterator over SETS of solutions, one for every component except `subtree_id` (me)
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            // Recursively enumerate all solutions matching the description above,
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 

	
 
    // Recursively build local solutions for this connector,
 
    // see `submit_and_digest_subtree_solution`
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        cu: &mut impl CuUndecided,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep recursively creating combined solutions
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        cu,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. This is a solution for this connector...
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.ips.port_info.map.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        if let Some(set) = self.ips.port_info.owned.get_mut(&self.proto_component_id) {
 
            set.retain(|x| !moved_ports.contains(x));
 
        }
 
        self.ips.port_info.owned.insert(new_cid, moved_ports.clone());
 
    }
 

	
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips
 
            .port_info
 
            .owned
 
            .entry(self.proto_component_id)
 
            .or_default()
 
            .extend([o, i].iter().copied());
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.ips.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
 
    // The component calls the runtime back, trying to inspect a port's message
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        let maybe_msg = self.branch_inner.inbox.get(&port);
 
        if maybe_msg.is_some() {
 
            // Make a note that this component has received
 
            // this port's message 1+ times this round
 
            self.branch_inner.did_put_or_get.insert(port);
 
        }
 
        maybe_msg
 
    }
 

	
 
    // NOT CURRENTLY USED
 
    // Once this component has injected a new nondeterministic branch with
 
    // SyncBlocker::NondetChoice, this is how the component retrieves it.
 
    // (Two step process necessary to get around mutable access rules,
 
    //  as injection of the nondeterministic choice modifies the
 
    //  branch predicate, forks the branch, etc.)
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainerInner { swap, output } }
 
    }
 

	
 
    // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
    // A "drain" would refer to a procedure that empties the input and populates the output.
 
    // It's "cyclic" because the processing function can also populate the input.
 
    // Making this memory safe requires an additional temporary storage, such that
 
    // the input can safely be drained and populated concurrently.
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                // func is the user-provided callback function, which consumes an element
 
                // as its drained from the input
 
                func(k, v, CyclicDrainerInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> {
 
    // Add this key-value pair to be yielded by the drainer later
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 

	
 
    // Add this key-value pair as an output of the drainer
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -128,732 +128,774 @@ enum Decision {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 

	
 
// Control messages exchanged during the setup phase only
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
// A data structure encoding the state of a connector, passed around
 
// during the session optimization procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfoMap,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 

	
 
// Newtype wrapper for an Arc<ProtocolDescription>,
 
// such that it can be (de)serialized for transmission over the network.
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 

	
 
// Control message particular to the communication phase.
 
// as such, it's annotated with a round_index
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 

	
 
// Connector <-> connector control messages for use in the communication phase
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // child->parent
 
    Announce { decision: Decision },  // parent->child
 
}
 

	
 
// Speculative payload message, communicating the value for the given
 
// port's message predecated on the given speculative variable assignments.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 

	
 
// Return result of `Predicate::assignment_union`, communicating the contents
 
// of the predicate which represents the (consistent) union of their mappings,
 
// if it exists (no variable mapped distinctly by the input predicates)
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
// One of two endpoints for a control channel with a connector on either end.
 
// The underlying transport is TCP, so we use an inbox buffer to allow
 
// discrete payload receipt.
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 

	
 
// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 

	
 
// NetEndpoint annotated with the ID of the port that receives payload
 
// messages received through the endpoint. This approach assumes that NetEndpoints
 
// DO NOT multiplex port->port channels, and so a mapping such as this is possible.
 
// As a result, the messages themselves don't need to carry the PortID with them.
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component"
 
// described in the literature.
 
// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager),
 
// It acts as a native component by managing a (speculative) set of payload messages (an outbox,
 
//  protecting the peer on the other side of the network).
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Meta-data for the connector: its role in the consensus tree.
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 

	
 
// Manages the connector's ID, and manages allocations for connector/port IDs.
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
// A generator of speculative variables. Created on-demand during the synchronous round
 
// by the IdManager.
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 

	
 
// Manages the messy state of the various endpoints, pollers, buffers, etc.
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
// an event during poll(), signifying that they need to be checked for new incoming data
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 

	
 
// The information associated with a port identifier, designed for local storage.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owner: ComponentId,
 
    peer: Option<PortId>,
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 

	
 
// Newtype around port info map, allowing the implementation of some
 
// useful methods
 
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct PortInfoMap {
 
    // invariant: self.invariant_preserved()
 
    // `owned` is redundant information, allowing for fast lookup
 
    // of a component's owned ports (which occurs during the sync round a lot)
 
    map: HashMap<PortId, PortInfo>,
 
    owned: HashMap<ComponentId, HashSet<PortId>>,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct IdAndPortState {
 
    port_info: PortInfoMap,
 
    id_manager: IdManager,
 
}
 

	
 
// A component's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 

	
 
// A component's data common to both setup and communication phases
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    logger: Box<dyn Logger>,
 
    ips: IdAndPortState,
 
    native_component_id: ComponentId,
 
}
 

	
 
// A connector's phase-specific data
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 

	
 
// A connector's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
// A newtype wrapper for a map from speculative variable to speculative value
 
// A missing mapping corresponds with "unspecified".
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 

	
 
// Identifies a child of this connector in the _solution tree_.
 
// Each connector creates its own local solutions for the consensus procedure during `sync`,
 
// from the solutions of its children. Those children are either locally-managed components,
 
// (which are leaves in the solution tree), or other connectors reachable through the given
 
// network endpoint (which are internal nodes in the solution tree).
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
// An accumulation of the connector's knowledge of all (a) the local solutions its children
 
// in the solution tree have found, and (b) its own solutions derivable from those of its children.
 
// This structure starts off each round with an empty set, and accumulates solutions as they are found
 
// by local components, or received over the network in control messages.
 
// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to
 
// say that these sets GROW until the round is over, and all solutions are reset.
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>, // already sent to this connector's parent OR decided
 
    new_local: HashSet<Predicate>, // not yet sent to this connector's parent OR decided
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        // establish the invariant
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // Insert the given element. Returns whether it was already present.
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfoMap {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.map.iter().filter(move |(_, port_info)| port_info.owner == owner).map(|(port, _)| port)
 
        self.owned.get(&owner).into_iter().flat_map(HashSet::iter)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.map.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => info.peer.unwrap(),
 
        })
 
    }
 
    fn invariant_preserved(&self) -> bool {
 
        // for every port P with some owner O,
 
        // P is in O's owned set
 
        for (port, info) in self.map.iter() {
 
            match self.owned.get(&info.owner) {
 
                Some(set) if set.contains(port) => {}
 
                _ => {
 
                    println!("{:#?}\n WITH port {:?}", self, port);
 
                    return false;
 
                }
 
            }
 
        }
 
        // for every port P owned by every owner O,
 
        // P's owner is O
 
        for (&owner, set) in self.owned.iter() {
 
            for port in set {
 
                match self.map.get(port) {
 
                    Some(info) if info.owner == owner => {}
 
                    _ => {
 
                        println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port);
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 
        true
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(self.unphased.logger(), "Connector dropping. Goodbye!");
 
    }
 
}
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
}
 
impl Connector {
 
    /// Generate a random connector identifier from the system's source of randomness.
 
    pub fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete,
 
    /// and it is ready to participate in synchronous rounds of communication.
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
        // get a lot closer to the connector impl. itself.
 
        // Instead, the C-oriented implementation doesn't distinguish connector states as types,
 
        // and distinguish them as enum variants instead
 
        match self.phased {
 
            ConnectorPhased::Setup(..) => false,
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
    /// with polarity output, input respectively. Available during either setup/communication phase.
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.ips.id_manager.new_port_id();
 
        // allocate two fresh port identifiers
 
        let [o, i] = [new_cid(), new_cid()];
 
        // store info for each:
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        cu.ips
 
            .port_info
 
            .owned
 
            .entry(cu.native_component_id)
 
            .or_default()
 
            .extend([o, i].iter().copied());
 

	
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
    /// Instantiates a new component for the connector runtime to manage, and passing
 
    /// the given set of ports from the interface of the native component, to that of the
 
    /// newly created component (passing their ownership).
 
    /// # Errors
 
    /// Error is returned if the moved ports are not owned by the native component,
 
    /// if the given component name is not defined in the connector's protocol,
 
    /// the given sequence of ports contains a duplicate port,
 
    /// or if the component is unfit for instantiation with the given port sequence.
 
    /// # Panics
 
    /// This function panics if the connector's (large) component id space is exhausted.
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // Check for error cases first before modifying `cu`
 
        use AddComponentError as Ace;
 
        let cu = &self.unphased;
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components.insert(new_cid, cu.proto_description.new_component(identifier, ports));
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.ips.port_info.map.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) {
 
            set.retain(|x| !ports.contains(x));
 
        }
 
        cu.ips.port_info.owned.insert(new_cid, ports.iter().copied().collect());
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
    }
 
    #[inline]
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 

	
 
    // Return true whether `self` is a subset of `maybe_superset`
 
    pub fn assigns_subset(&self, maybe_superset: &Self) -> bool {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
                Some(val2) if val2 == val => {}
 
                _ => return false, // var unmapped, or mapped differently
 
            }
 
        }
 
        // `maybe_superset` mirrored all my assignments!
 
        true
 
    }
 

	
 
    /// Given the two predicates {self, other}, return that whose
 
    /// assignments are the union of those of both.
 
    fn assignment_union(&self, other: &Self) -> AssignmentUnionResult {
 
        use AssignmentUnionResult as Aur;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // populate lists of assignments in self but not other and vice versa.
 
        // do this by incrementally unfolding the iterators, keeping an eye
 
        // on the ordering between the head elements [s, o].
 
        // whenever s<o, other is certainly missing element 's', etc.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break, // both iterators are empty
 
                [None, Some(x)] => {
 
                    // self's iterator is empty.
 
                    // all remaning elements are in other but not self
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    // other's iterator is empty.
 
                    // all remaning elements are in self but not other
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        // No predicate exists which satisfies both!
 
                        return Aur::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Aur::Equivalent,       // ... equivalent to both.
 
            [false, true] => Aur::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Aur::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Aur::New(new)
 
            }
 
        }
 
    }
 

	
 
    // Compute the union of the assignments of the two given predicates, if it exists.
 
    // It doesn't exist if there is some value which the predicates assign to different values.
 
    pub(crate) fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub(crate) fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 

	
 
impl RoundCtx {
 
    // remove an arbitrary buffered message, along with the ID of the getter who receives it
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 

	
 
    // buffer a message along with the ID of the getter who receives it
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 

	
 
    // buffer a message along with the ID of the putter who sent it
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 

	
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct Assignment<'a>((&'a SpecVar, &'a SpecVal));
 
        impl Debug for Assignment<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                write!(f, "{:?}={:?}", (self.0).0, (self.0).1)
 
            }
 
        }
 
        f.debug_set().entries(self.assigned.iter().map(Assignment)).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl IdParts for SpecVar {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let (a, b) = self.id_parts();
 
        write!(f, "v{}_{}", a, b)
 
    }
 
}
 
impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
#[derive(Default)]
 
struct ExtraPortInfo {
 
    info: HashMap<PortId, PortInfo>,
 
    peers: HashMap<PortId, PortId>,
 
}
 

	
 
impl TokenTarget {
 
    // subdivides the domain of usize into
 
    // [NET_ENDPOINT][UDP_ENDPOINT  ]
 
    // ^0            ^usize::MAX/2   ^usize::MAX
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 

	
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                // allocates 4 new port identifiers, two for each logical channel,
 
                // one channel per direction (into and out of the component)
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                // allocate the native->udp_mediator channel's ports
 
                cu.ips.port_info.map.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                // allocate the udp_mediator->native channel's ports
 
                cu.ips.port_info.map.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                // allocate the two ports owned by the UdpMediator component
 
                // Remember to setup this UdpEndpoint setup during `connect` later.
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 

	
 
                // update owned sets
 
                cu.ips
 
                    .port_info
 
                    .owned
 
                    .entry(cu.native_component_id)
 
                    .or_default()
 
                    .extend([nin, nout].iter().copied());
 
                cu.ips.port_info.owned.insert(udp_cid, maplit::hashset! {uin, uout});
 
                // Return the native's output, input port pair
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                // allocate a single dangling port with a `None` peer (for now)
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.map.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                // Remember to setup this NetEndpoint setup during `connect` later.
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                // update owned set
 
                cu.ips.port_info.owned.entry(cu.native_component_id).or_default().insert(new_pid);
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction. Learn our role in the consensus tree,
 
                // from learning who are our children/parents (neighbors) in the consensus tree.
 
                let neighborhood = init_neighborhood(
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // Put it all together with an initial round index of zero.
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None), // no previous round yet
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    // Perform the session optimization procedure, which may modify the
 
                    // internals of the connector, rerouting ports, moving around connectors etc.
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                // Connect procedure successful! Commit changes by...
 
                // ... commiting new port info for ConnectorUnphased
 
                for (port, info) in extra_port_info.info.drain() {
 
                    cu.ips.port_info.owned.entry(info.owner).or_default().insert(port);
 
                    cu.ips.port_info.map.insert(port, info);
 
                }
 
                for (port, peer) in extra_port_info.peers.drain() {
 
                    cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer);
 
                }
 
                // ... replacing the connector's phase to "communication"
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the timeout for the poll call
 
        let remaining = match (deadline, net_connect_to_retry.is_empty()) {
 
            (None, true) => None,
 
            (None, false) => Some(RETRY_PERIOD),
 
            (Some(deadline), is_empty) => {
 
                let dur_to_timeout =
 
                    deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?;
 
                Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) })
 
            }
 
        };
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
 
                match &mut net_todo.todo_endpoint {
 
                    NetTodoEndpoint::PeerInfoRecving(endpoint) => {
 
                        let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                            .expect("mio::TcpStream connect should not fail!");
 
                        std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                        let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                        poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        }
 
        for event in events.iter() {
 
            let token = event.token();
 
            // figure out which endpoint the event belonged to
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::UdpEndpoint { index } => {
 
                    // UdpEndpoints are easy to complete.
 
                    // Their setup event just has to succeed without error
 
                    if !setup_incomplete.contains(&token_target) {
 
                        // spurious wakeup. this endpoint has already been set up!
 
                        continue;
 
                    }
 
                    let udp_todo: &UdpTodo = &udp_todos[index];
 
                    if event.is_error() {
 
                        return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap()));
 
                    }
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    // NetEndpoints are complex to complete,
 
                    // they must accept/connect to their peer,
 
                    // and then exchange port info successfully
 
                    let net_todo = &mut net_todos[index];
 
                    if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // Passive endpoint that will first try accept the peer's connection
 
                        match listener.accept() {
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                            }
 
                            Ok((mut stream, peer_addr)) => {
 
                                // successfully accepted the active peer
 
                                // reusing the token, but now for the stream and not the listener
 
                                poll.registry().deregister(listener).unwrap();
 
@@ -813,205 +825,206 @@ fn init_neighborhood(
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                // `recv_index` is my child
 
                children.push(recv_index);
 
            }
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. })
 
            | msg @ S(Sm::SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    // Neighborhood complete!
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
 

	
 
// Connectors collect a map of type ConnectorId=>SessionInfo,
 
// representing a global view of the session's state at the leader.
 
// The leader rewrites its contents however they like (currently: nothing happens)
 
// and the map is again broadcasted, for each peer to make their local changes to
 
// reflect the results of the rewrite.
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: &Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(Sm::YouAreMyParent)
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    // add my own session info to the map
 
    let my_session_info = SessionInfo {
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    // extract my own ConnectorId's entry
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    // broadcast the optimized session info to my children
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    // apply local optimizations
 
    apply_my_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 

	
 
// Defines the optimization function, consuming an optimized map,
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 

	
 
// Modify the given connector's internals to reflect
 
// the given session info
 
fn apply_my_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // simply overwrite the contents
 
    cu.ips.port_info = port_info;
 
    assert!(cu.ips.port_info.invariant_preserved());
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
 
}
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const MS100: Option<Duration> = Some(Duration::from_millis(100));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir");
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file = File::create(path).expect("Failed to create log output file!");
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, pd, connector_id)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 
fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
    let mut v = Vec::with_capacity(cap);
 
    // Safe! len will cover owned bytes in valid state
 
    unsafe { v.set_len(cap) }
 
    v
 
}
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let test_log_path = Path::new("./logs/unconnected_gotten_err");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_no_round");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_ungotten");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.sync(SEC1).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, TEST_MSG.clone()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, TEST_MSG.clone()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(SEC1).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
0 comments (0 inline, 0 general)