Changeset - d5c0b4fdc19b
[Not reviewed]
0 6 5
Christopher Esterhuyse - 5 years ago 2020-09-29 17:14:41
christopher.esterhuyse@gmail.com
more robust test dir creation
11 files changed with 234 insertions and 6 deletions:
0 comments (0 inline, 0 general)
examples/bench_4/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, proto_components;
 
	proto_components = atoi(argv[1]);
 
	printf("proto_components: %d\n", proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_4.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<proto_components; i++) {
 
		connector_add_component(c, "trivial_loop", 12, NULL, 0);
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_5/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, port_pairs, proto_components;
 
	port_pairs = atoi(argv[1]);
 
	proto_components = atoi(argv[2]);
 
	printf("port_pairs %d, proto_components: %d\n", port_pairs, proto_components);
 

	
 
	const unsigned char pdl[] = 
 
	"primitive trivial_loop() {   "
 
	"    while(true) synchronous{}"
 
	"}                            "
 
	;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_5.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<port_pairs; i++) {
 
		connector_add_port_pair(c, NULL, NULL);
 
	}
 
	for (i=0; i<proto_components; i++) {
 
		char ident[] = "trivial_loop";
 
		connector_add_component(c, ident, sizeof(ident)-1, NULL, 0);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_6/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, self_syncs;
 
	self_syncs = atoi(argv[1]);
 
	printf("self_syncs %d\n", self_syncs);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	char logpath[] = "./bench_6.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	for (i=0; i<self_syncs; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		char ident[] = "sync"; // defined in reowolf's stdlib 
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){getter, putter}, 2);
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_7/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_7.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		char ident[] = "forward"; // defined in reowolf's stdlib 
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		char msg[] = "Hello, world!";
 
		connector_put_bytes(c, native_putter, msg, sizeof(msg)-1);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_8/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_8.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		char ident[] = "forward"; // defined in reowolf's stdlib 
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	size_t msg_len = 0xffff;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msg_len);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/zoop.sh
Show inline comments
 
new file 100644
 
#!/bin/bash
 
for syncs in {0..16}
 
do
 
	./bench_8/main.exe $syncs
 
done
 
\ No newline at end of file
src/ffi/mod.rs
Show inline comments
 
@@ -115,194 +115,198 @@ pub unsafe extern "C" fn protocol_description_parse(
 
) -> *mut Arc<ProtocolDescription> {
 
    StoredError::tl_clear();
 
    match ProtocolDescription::parse(&*slice_from_raw_parts(pdl, pdl_len)) {
 
        Ok(new) => Box::into_raw(Box::new(Arc::new(new))),
 
        Err(err) => {
 
            StoredError::tl_bytes_store(err.as_bytes());
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> *mut Arc<ProtocolDescription> {
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id());
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
    if !out_putter.is_null() {
 
        out_putter.write(o);
 
    }
 
    if !out_getter.is_null() {
 
        out_getter.write(i);
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_raw_parts(ident_ptr, ident_len),
 
        &*slice_from_raw_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => RW_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    port: *mut PortId,
 
    addr: FfiSocketAddr,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_net_port(port_polarity, addr.into(), endpoint_polarity) {
 
        Ok(p) => {
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            RW_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_mediator_component(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr: FfiSocketAddr,
 
    peer_addr: FfiSocketAddr,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_udp_mediator_component(local_addr.into(), peer_addr.into()) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
 
            }
 
            if !getter.is_null() {
 
                getter.write(g);
 
            }
 
            RW_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            RW_TL_ERR
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
src/runtime/communication.rs
Show inline comments
 
@@ -1287,202 +1287,212 @@ impl SolutionStorage {
 
    }
 
    // drain old_local to new_local, visiting all new additions to old_local
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            // rely on invariant: empty intersection between old and new local sets
 
            assert!(old_local.insert(local.clone()));
 
            local
 
        })
 
    }
 
    // insert a solution for the given subtree ID,
 
    // AND update new_local to include any solutions that become 
 
    // possible as a result of this new addition  
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self;
 
        let index = subtree_id_to_index[&subtree_id];
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            // This is a newly-added solution! update new_local
 
            // consider ALL consistent combinations of one element from each solution set
 
            // to our right or left in the solution-set vector
 
            // but with THIS PARTICULAR predicate from our own index.
 
            let left = 0..index;
 
            let right = (index + 1)..subtree_solutions.len();
 
            // iterator over SETS of solutions, one for every component except `subtree_id` (me)
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            // Recursively enumerate all solutions matching the description above,
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 

	
 
    // Recursively build local solutions for this connector,
 
    // see `submit_and_digest_subtree_solution`
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        cu: &mut impl CuUndecided,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep recursively creating combined solutions
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        cu,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. This is a solution for this connector...
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.ips.port_info.map.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        if let Some(set) = self.ips.port_info.owned.get_mut(&self.proto_component_id) {
 
            set.retain(|x| !moved_ports.contains(x));
 
        }
 
        self.ips.port_info.owned.insert(new_cid, moved_ports.clone());
 
    }
 

	
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips
 
            .port_info
 
            .owned
 
            .entry(self.proto_component_id)
 
            .or_default()
 
            .extend([o, i].iter().copied());
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.ips.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
 
    // The component calls the runtime back, trying to inspect a port's message
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        let maybe_msg = self.branch_inner.inbox.get(&port);
 
        if maybe_msg.is_some() {
 
            // Make a note that this component has received
 
            // this port's message 1+ times this round
 
            self.branch_inner.did_put_or_get.insert(port);
 
        }
 
        maybe_msg
 
    }
 

	
 
    // NOT CURRENTLY USED
 
    // Once this component has injected a new nondeterministic branch with
 
    // SyncBlocker::NondetChoice, this is how the component retrieves it.
 
    // (Two step process necessary to get around mutable access rules,
 
    //  as injection of the nondeterministic choice modifies the
 
    //  branch predicate, forks the branch, etc.)
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainerInner { swap, output } }
 
    }
 

	
 
    // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
    // A "drain" would refer to a procedure that empties the input and populates the output.
 
    // It's "cyclic" because the processing function can also populate the input.
 
    // Making this memory safe requires an additional temporary storage, such that
 
    // the input can safely be drained and populated concurrently.
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                // func is the user-provided callback function, which consumes an element
 
                // as its drained from the input
 
                func(k, v, CyclicDrainerInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> {
 
    // Add this key-value pair to be yielded by the drainer later
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 

	
 
    // Add this key-value pair as an output of the drainer
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -224,540 +224,582 @@ struct UdpEndpointSetup {
 

	
 
// NetEndpoint annotated with the ID of the port that receives payload
 
// messages received through the endpoint. This approach assumes that NetEndpoints
 
// DO NOT multiplex port->port channels, and so a mapping such as this is possible.
 
// As a result, the messages themselves don't need to carry the PortID with them.
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component"
 
// described in the literature.
 
// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager),
 
// It acts as a native component by managing a (speculative) set of payload messages (an outbox,
 
//  protecting the peer on the other side of the network).
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Meta-data for the connector: its role in the consensus tree.
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 

	
 
// Manages the connector's ID, and manages allocations for connector/port IDs.
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
// A generator of speculative variables. Created on-demand during the synchronous round
 
// by the IdManager.
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 

	
 
// Manages the messy state of the various endpoints, pollers, buffers, etc.
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
// an event during poll(), signifying that they need to be checked for new incoming data
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 

	
 
// The information associated with a port identifier, designed for local storage.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owner: ComponentId,
 
    peer: Option<PortId>,
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 

	
 
// Newtype around port info map, allowing the implementation of some
 
// useful methods
 
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct PortInfoMap {
 
    // invariant: self.invariant_preserved()
 
    // `owned` is redundant information, allowing for fast lookup
 
    // of a component's owned ports (which occurs during the sync round a lot)
 
    map: HashMap<PortId, PortInfo>,
 
    owned: HashMap<ComponentId, HashSet<PortId>>,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct IdAndPortState {
 
    port_info: PortInfoMap,
 
    id_manager: IdManager,
 
}
 

	
 
// A component's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 

	
 
// A component's data common to both setup and communication phases
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    logger: Box<dyn Logger>,
 
    ips: IdAndPortState,
 
    native_component_id: ComponentId,
 
}
 

	
 
// A connector's phase-specific data
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 

	
 
// A connector's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
// A newtype wrapper for a map from speculative variable to speculative value
 
// A missing mapping corresponds with "unspecified".
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 

	
 
// Identifies a child of this connector in the _solution tree_.
 
// Each connector creates its own local solutions for the consensus procedure during `sync`,
 
// from the solutions of its children. Those children are either locally-managed components,
 
// (which are leaves in the solution tree), or other connectors reachable through the given
 
// network endpoint (which are internal nodes in the solution tree).
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
// An accumulation of the connector's knowledge of all (a) the local solutions its children
 
// in the solution tree have found, and (b) its own solutions derivable from those of its children.
 
// This structure starts off each round with an empty set, and accumulates solutions as they are found
 
// by local components, or received over the network in control messages.
 
// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to
 
// say that these sets GROW until the round is over, and all solutions are reset.
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>, // already sent to this connector's parent OR decided
 
    new_local: HashSet<Predicate>, // not yet sent to this connector's parent OR decided
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        // establish the invariant
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // Insert the given element. Returns whether it was already present.
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfoMap {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.map.iter().filter(move |(_, port_info)| port_info.owner == owner).map(|(port, _)| port)
 
        self.owned.get(&owner).into_iter().flat_map(HashSet::iter)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.map.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => info.peer.unwrap(),
 
        })
 
    }
 
    fn invariant_preserved(&self) -> bool {
 
        // for every port P with some owner O,
 
        // P is in O's owned set
 
        for (port, info) in self.map.iter() {
 
            match self.owned.get(&info.owner) {
 
                Some(set) if set.contains(port) => {}
 
                _ => {
 
                    println!("{:#?}\n WITH port {:?}", self, port);
 
                    return false;
 
                }
 
            }
 
        }
 
        // for every port P owned by every owner O,
 
        // P's owner is O
 
        for (&owner, set) in self.owned.iter() {
 
            for port in set {
 
                match self.map.get(port) {
 
                    Some(info) if info.owner == owner => {}
 
                    _ => {
 
                        println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port);
 
                        return false;
 
                    }
 
                }
 
            }
 
        }
 
        true
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(self.unphased.logger(), "Connector dropping. Goodbye!");
 
    }
 
}
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
}
 
impl Connector {
 
    /// Generate a random connector identifier from the system's source of randomness.
 
    pub fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete,
 
    /// and it is ready to participate in synchronous rounds of communication.
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
        // get a lot closer to the connector impl. itself.
 
        // Instead, the C-oriented implementation doesn't distinguish connector states as types,
 
        // and distinguish them as enum variants instead
 
        match self.phased {
 
            ConnectorPhased::Setup(..) => false,
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
    /// with polarity output, input respectively. Available during either setup/communication phase.
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.ips.id_manager.new_port_id();
 
        // allocate two fresh port identifiers
 
        let [o, i] = [new_cid(), new_cid()];
 
        // store info for each:
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        cu.ips
 
            .port_info
 
            .owned
 
            .entry(cu.native_component_id)
 
            .or_default()
 
            .extend([o, i].iter().copied());
 

	
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
    /// Instantiates a new component for the connector runtime to manage, and passing
 
    /// the given set of ports from the interface of the native component, to that of the
 
    /// newly created component (passing their ownership).
 
    /// # Errors
 
    /// Error is returned if the moved ports are not owned by the native component,
 
    /// if the given component name is not defined in the connector's protocol,
 
    /// the given sequence of ports contains a duplicate port,
 
    /// or if the component is unfit for instantiation with the given port sequence.
 
    /// # Panics
 
    /// This function panics if the connector's (large) component id space is exhausted.
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // Check for error cases first before modifying `cu`
 
        use AddComponentError as Ace;
 
        let cu = &self.unphased;
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components.insert(new_cid, cu.proto_description.new_component(identifier, ports));
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.ips.port_info.map.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) {
 
            set.retain(|x| !ports.contains(x));
 
        }
 
        cu.ips.port_info.owned.insert(new_cid, ports.iter().copied().collect());
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
    }
 
    #[inline]
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 

	
 
    // Return true whether `self` is a subset of `maybe_superset`
 
    pub fn assigns_subset(&self, maybe_superset: &Self) -> bool {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
                Some(val2) if val2 == val => {}
 
                _ => return false, // var unmapped, or mapped differently
 
            }
 
        }
 
        // `maybe_superset` mirrored all my assignments!
 
        true
 
    }
 

	
 
    /// Given the two predicates {self, other}, return that whose
 
    /// assignments are the union of those of both.
 
    fn assignment_union(&self, other: &Self) -> AssignmentUnionResult {
 
        use AssignmentUnionResult as Aur;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // populate lists of assignments in self but not other and vice versa.
 
        // do this by incrementally unfolding the iterators, keeping an eye
 
        // on the ordering between the head elements [s, o].
 
        // whenever s<o, other is certainly missing element 's', etc.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break, // both iterators are empty
 
                [None, Some(x)] => {
 
                    // self's iterator is empty.
 
                    // all remaning elements are in other but not self
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    // other's iterator is empty.
 
                    // all remaning elements are in self but not other
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        // No predicate exists which satisfies both!
 
                        return Aur::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Aur::Equivalent,       // ... equivalent to both.
 
            [false, true] => Aur::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Aur::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Aur::New(new)
 
            }
 
        }
 
    }
 

	
 
    // Compute the union of the assignments of the two given predicates, if it exists.
 
    // It doesn't exist if there is some value which the predicates assign to different values.
src/runtime/setup.rs
Show inline comments
 
@@ -38,301 +38,313 @@ impl Connector {
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 

	
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                // allocates 4 new port identifiers, two for each logical channel,
 
                // one channel per direction (into and out of the component)
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                // allocate the native->udp_mediator channel's ports
 
                cu.ips.port_info.map.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                // allocate the udp_mediator->native channel's ports
 
                cu.ips.port_info.map.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                // allocate the two ports owned by the UdpMediator component
 
                // Remember to setup this UdpEndpoint setup during `connect` later.
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 

	
 
                // update owned sets
 
                cu.ips
 
                    .port_info
 
                    .owned
 
                    .entry(cu.native_component_id)
 
                    .or_default()
 
                    .extend([nin, nout].iter().copied());
 
                cu.ips.port_info.owned.insert(udp_cid, maplit::hashset! {uin, uout});
 
                // Return the native's output, input port pair
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                // allocate a single dangling port with a `None` peer (for now)
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.map.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                // Remember to setup this NetEndpoint setup during `connect` later.
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                // update owned set
 
                cu.ips.port_info.owned.entry(cu.native_component_id).or_default().insert(new_pid);
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction. Learn our role in the consensus tree,
 
                // from learning who are our children/parents (neighbors) in the consensus tree.
 
                let neighborhood = init_neighborhood(
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // Put it all together with an initial round index of zero.
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None), // no previous round yet
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    // Perform the session optimization procedure, which may modify the
 
                    // internals of the connector, rerouting ports, moving around connectors etc.
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                // Connect procedure successful! Commit changes by...
 
                // ... commiting new port info for ConnectorUnphased
 
                for (port, info) in extra_port_info.info.drain() {
 
                    cu.ips.port_info.owned.entry(info.owner).or_default().insert(port);
 
                    cu.ips.port_info.map.insert(port, info);
 
                }
 
                for (port, peer) in extra_port_info.peers.drain() {
 
                    cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer);
 
                }
 
                // ... replacing the connector's phase to "communication"
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
@@ -909,109 +921,110 @@ fn session_optimize(
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    // extract my own ConnectorId's entry
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    // broadcast the optimized session info to my children
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    // apply local optimizations
 
    apply_my_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 

	
 
// Defines the optimization function, consuming an optimized map,
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 

	
 
// Modify the given connector's internals to reflect
 
// the given session info
 
fn apply_my_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // simply overwrite the contents
 
    cu.ips.port_info = port_info;
 
    assert!(cu.ips.port_info.invariant_preserved());
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
 
}
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const MS100: Option<Duration> = Some(Duration::from_millis(100));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir");
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file = File::create(path).expect("Failed to create log output file!");
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, pd, connector_id)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 
fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
    let mut v = Vec::with_capacity(cap);
 
    // Safe! len will cover owned bytes in valid state
 
    unsafe { v.set_len(cap) }
 
    v
 
}
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
0 comments (0 inline, 0 general)