Changeset - 528578bffb3f
[Not reviewed]
9 2 13
Christopher Esterhuyse - 5 years ago 2020-10-08 12:43:24
christopher.esterhuyse@gmail.com
mild refactor of setup procedure's outermost function; more information is available to the session optimization step, and its easier to understand how invariants are preserved
15 files changed with 421 insertions and 72 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -40,11 +40,11 @@ lazy_static = "1.4.0"
 
crate-type = [
 
	"rlib", # for use as a Rust dependency. 
 
	"cdylib" # for FFI use, typically C.
 
]
 

	
 
[features]
 
default = ["ffi"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/bench_01/main.c
Show inline comments
 
file renamed from examples/bench_1/main.c to examples/bench_01/main.c
examples/bench_02/main.c
Show inline comments
 
file renamed from examples/bench_2/main.c to examples/bench_02/main.c
examples/bench_03/main.c
Show inline comments
 
file renamed from examples/bench_3/main.c to examples/bench_03/main.c
examples/bench_04/main.c
Show inline comments
 
file renamed from examples/bench_4/main.c to examples/bench_04/main.c
examples/bench_05/main.c
Show inline comments
 
file renamed from examples/bench_5/main.c to examples/bench_05/main.c
examples/bench_06/main.c
Show inline comments
 
file renamed from examples/bench_6/main.c to examples/bench_06/main.c
examples/bench_07/main.c
Show inline comments
 
file renamed from examples/bench_7/main.c to examples/bench_07/main.c
examples/bench_08/main.c
Show inline comments
 
file renamed from examples/bench_8/main.c to examples/bench_08/main.c
examples/bench_09/main.c
Show inline comments
 
file renamed from examples/bench_9/main.c to examples/bench_09/main.c
examples/bench_23/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i;
 

	
 
	// unsigned char pdl[] = "\
 
	// primitive xrouter(in a, out b, out c) {\
 
 //        while(true) synchronous {\
 
 //            if(fires(a)) {\
 
 //                if(fires(b)) put(b, get(a));\
 
 //                else         put(c, get(a));\
 
 //            }\
 
 //        }\
 
 //    }"
 
 //    ;
 
	unsigned char pdl[] = "\
 
	primitive lossy(in a, out b) {\
 
        while(true) synchronous {\
 
            if(fires(a)) {\
 
                msg m = get(a);\
 
                if(fires(b)) put(b, m);\
 
            }\
 
        }\
 
    }\
 
    primitive sync_drain(in a, in b) {\
 
        while(true) synchronous {\
 
            if(fires(a)) {\
 
                get(a);\
 
                get(b);\
 
            }\
 
        }\
 
    }\
 
    composite xrouter(in a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        channel p -> q;\
 
        channel r -> s;\
 
        channel t -> u;\
 
        new replicator(a, d, f);\
 
        new replicator(g, t, h);\
 
        new lossy(e, l);\
 
        new lossy(i, j);\
 
        new replicator(m, b, p);\
 
        new replicator(k, n, c);\
 
        new merger(q, o, r);\
 
        new sync_drain(u, s);\
 
    }"
 
    ;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, 0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	PortId ports[6];
 
	for(i=0; i<3; i++) {
 
		connector_add_port_pair(c, &ports[2*i], &ports[2*i+1]);
 
	}
 
	// [native~~~~~~~~~~]
 
	//  0  1  2  3  4  5
 
	//  |  ^  |  ^  |  ^  
 
	//  `--`  `--`  `--`  
 
	char ident[] = "xrouter";
 
	connector_add_component(
 
		c,
 
		ident,
 
		sizeof(ident)-1,
 
		(PortId[]) { ports[1], ports[2], ports[4] },
 
		3);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	// [native~~~~~~~~~~]
 
	//  0        3     5
 
	//  V        ^     ^  
 
	//  1        2     4  
 
	// [xrouter~~~~~~~~~]
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	int msg_len = 1000;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 

	
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[3]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("First: %f\n", time_spent);
 
	}
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[5]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("Second: %f\n", time_spent);
 
	}
 
	{
 
		clock_t begin = clock();
 
		for (i=0; i<100000; i++) {
 
			connector_put_bytes(c, ports[0], msg, msg_len);
 
			connector_get(c, ports[3 + (i%2)*2]);
 
			connector_sync(c, -1);
 
		}
 
		clock_t end = clock();
 
		double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
		printf("Alternating: %f\n", time_spent);
 
	}
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_24/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, j;
 

	
 
	unsigned char pdl[] = "\
 
	primitive fifo1_init(msg m, in a, out b) {\
 
        while(true) synchronous {\
 
            if(m != null && fires(b)) {\
 
                put(b, m);\
 
                m = null;\
 
            } else if (m == null && fires(a)) {\
 
                m = get(a);\
 
            }\
 
        }\
 
    }\
 
    composite fifo1_full(in a, out b) {\
 
        new fifo1_init(create(0), a, b);\
 
    }\
 
    composite fifo1(in a, out b) {\
 
        new fifo1_init(null, a, b);\
 
    }\
 
    composite sequencer3(out a, out b, out c) {\
 
        channel d -> e;\
 
        channel f -> g;\
 
        channel h -> i;\
 
        channel j -> k;\
 
        channel l -> m;\
 
        channel n -> o;\
 
        new fifo1_full(o, d);\
 
        new replicator(e, f, a);\
 
        new fifo1(g, h);\
 
        new replicator(i, j, b);\
 
        new fifo1(k, l);\
 
        new replicator(m, n, c);\
 
    }"
 
    ;
 
	// unsigned char pdl[] = "\
 
	// primitive sequencer3(out a, out b, out c) {\
 
 //        int i = 0;\
 
 //        while(true) synchronous {\
 
 //            out to = a;\
 
 //            if     (i==1) to = b;\
 
 //            else if(i==2) to = c;\
 
 //            if(fires(to)) {\
 
 //                put(to, create(0));\
 
 //                i = (i + 1)%3;\
 
 //            }\
 
 //        }\
 
 //    }"
 
    ;
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, 0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	PortId putters[3], getters[3];
 
	for(i=0; i<3; i++) {
 
		connector_add_port_pair(c, &putters[i], &getters[i]);
 
	}
 
	char ident[] = "sequencer3";
 
	connector_add_component(c, ident, sizeof(ident)-1, putters, 3);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 

	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000/3; i++) {
 
		for (j=0; j<3; j++) {
 
			connector_get(c, getters[j]);
 
			connector_sync(c, -1);
 
		}
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_25/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, j, syncs;
 
    syncs = atoi(argv[1]);
 
    printf("syncs %d\n", syncs);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    char logpath[] = "./bench_11.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
    PortId native_putter, native_getter;
 
    connector_add_port_pair(c, &native_putter, &native_getter);
 
    for (i=0; i<syncs; i++) {
 
        // create a forward to tail of chain
 
        PortId putter, getter;
 
        connector_add_port_pair(c, &putter, &getter);
 
        // native ports: {native_putter, native_getter, putter, getter}
 
        // thread a forward component onto native_tail
 
        char ident[] = "sync";
 
        connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
        // native ports: {native_putter, getter}
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        native_getter = getter;
 
    }
 
    // add "recv_zero" on end of chain
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    {
 
        clock_t begin = clock();
 
        for (i=0; i<1000000; i++) {
 
            connector_put_bytes(c, native_putter, msg, msg_len);
 
            connector_get(c, native_getter);
 
            connector_sync(c, -1);  
 
        }
 
        clock_t end = clock();
 
        double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
        printf("Sending: %f\n", time_spent);
 
    }
 
    {
 
        clock_t begin = clock();
 
        for (i=0; i<1000000; i++) {
 
            connector_sync(c, -1);  
 
        }
 
        clock_t end = clock();
 
        double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
        printf("Not Sending: %f\n", time_spent);
 
    }
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_26/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) {
 
    FfiSocketAddr x;
 
    x.port = port;
 
    int i;
 
    for(i=0; i<4; i++) {
 
        x.ipv4[i] = ipv4[i];
 
    }
 
    return x;
 
}
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char leader;
 
    uint8_t ipv4[4] = { atoi(argv[1]), atoi(argv[2]), atoi(argv[3]), atoi(argv[4]) };
 
    leader = argv[5][0];
 
    rounds = atoi(argv[6]);
 
    printf("leader %c, rounds %d, peer at %d.%d.%d.%d\n",
 
        leader, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3]);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, leader=='y'?1:0);
 
    PortId ports[2];
 
    if(leader=='y') {
 
        EndpointPolarity ep = EndpointPolarity_Active;
 
        connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Putter, ep);
 
        connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Getter, ep);
 
    } else {
 
        EndpointPolarity ep = EndpointPolarity_Passive;
 
        connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Getter, ep);
 
        connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Putter, ep);
 
        char ident[] = "sync";
 
        connector_add_component(c, ident, sizeof(ident)-1, ports, 2);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        if(leader=='y') {
 
            connector_put_bytes(c, ports[0], msg, msg_len);
 
            connector_get(c, ports[1]);
 
        }
 
        connector_sync(c, -1);  
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
#[derive(Default)]
 
struct ExtraPortInfo {
 
    info: HashMap<PortId, PortInfo>,
 
    peers: HashMap<PortId, PortId>,
 
}
 

	
 
impl TokenTarget {
 
    // subdivides the domain of usize into
 
    // [NET_ENDPOINT][UDP_ENDPOINT  ]
 
    // ^0            ^usize::MAX/2   ^usize::MAX
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
@@ -205,82 +199,113 @@ impl Connector {
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction. Learn our role in the consensus tree,
 
                // from learning who are our children/parents (neighbors) in the consensus tree.
 
                let neighborhood = init_neighborhood(
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // Put it all together with an initial round index of zero.
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None), // no previous round yet
 
                // Ideally, we'd simply clone `cu` in its entirety, and pass it to
 
                // `connect_inner`, such that a successful connection discards the original.
 
                // We need to work around the `logger` not being clonable;
 
                // solution? create a dummy clone, and use mem-swap to ensure the
 
                // single real logger is wherever it needs to be.
 
                let mut cu_clone = ConnectorUnphased {
 
                    logger: Box::new(DummyLogger),
 
                    proto_components: cu.proto_components.clone(),
 
                    native_component_id: cu.native_component_id.clone(),
 
                    ips: cu.ips.clone(),
 
                    proto_description: cu.proto_description.clone(),
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    // Perform the session optimization procedure, which may modify the
 
                    // internals of the connector, rerouting ports, moving around connectors etc.
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                // Connect procedure successful! Commit changes by...
 
                // ... commiting new port info for ConnectorUnphased
 
                for (port, info) in extra_port_info.info.drain() {
 
                    cu.ips.port_info.owned.entry(info.owner).or_default().insert(port);
 
                    cu.ips.port_info.map.insert(port, info);
 
                }
 
                for (port, peer) in extra_port_info.peers.drain() {
 
                    cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer);
 
                // cu has REAL logger...
 
                std::mem::swap(&mut cu.logger, &mut cu_clone.logger);
 
                // ... cu_clone has REAL logger.
 
                match Self::connect_inner(cu_clone, setup, timeout) {
 
                    Ok(connected_connector) => {
 
                        *self = connected_connector;
 
                        Ok(())
 
                    }
 
                    Err((err, mut logger)) => {
 
                        // Put the original logger back in place (in self.unphased, AKA `cu`).
 
                        // cu_clone has REAL logger...
 
                        std::mem::swap(&mut cu.logger, &mut logger);
 
                        // ... cu has REAL logger.
 
                        Err(err)
 
                    }
 
                }
 
                // ... replacing the connector's phase to "communication"
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
    fn connect_inner(
 
        mut cu: ConnectorUnphased,
 
        setup: &ConnectorSetup,
 
        timeout: Option<Duration>,
 
    ) -> Result<Self, (ConnectError, Box<dyn Logger>)> {
 
        log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
        let deadline = timeout.map(|to| Instant::now() + to);
 
        let mut try_complete = || {
 
            // connect all endpoints in parallel; send and receive peer ids through ports
 
            let mut endpoint_manager = setup_endpoints_and_pair_ports(
 
                &mut *cu.logger,
 
                &setup.net_endpoint_setups,
 
                &setup.udp_endpoint_setups,
 
                &mut cu.ips.port_info,
 
                &deadline,
 
            )?;
 
            log!(
 
                cu.logger,
 
                "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                &cu.ips.port_info,
 
                &endpoint_manager,
 
            );
 
            // leader election and tree construction. Learn our role in the consensus tree,
 
            // from learning who are our children/parents (neighbors) in the consensus tree.
 
            let neighborhood = init_neighborhood(
 
                cu.ips.id_manager.connector_id,
 
                &mut *cu.logger,
 
                &mut endpoint_manager,
 
                &deadline,
 
            )?;
 
            log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
            // Put it all together with an initial round index of zero.
 
            let mut comm = ConnectorCommunication {
 
                round_index: 0,
 
                endpoint_manager,
 
                neighborhood,
 
                native_batches: vec![Default::default()],
 
                round_result: Ok(None), // no previous round yet
 
            };
 
            if cfg!(feature = "session_optimization") {
 
                // Perform the session optimization procedure, which may modify the
 
                // internals of the connector, rerouting ports, moving around connectors etc.
 
                session_optimize(&mut cu, &mut comm, &deadline)?;
 
            }
 
            log!(cu.logger, "connect() finished. setup phase complete");
 
            Ok(comm)
 
        };
 
        match try_complete() {
 
            Ok(comm) => {
 
                Ok(Self { unphased: cu, phased: ConnectorPhased::Communication(Box::new(comm)) })
 
            }
 
            Err(err) => Err((err, cu.logger)),
 
        }
 
    }
 
}
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &PortInfoMap,
 
    port_info: &mut PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
) -> Result<EndpointManager, ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
@@ -302,18 +327,15 @@ fn setup_endpoints_and_pair_ports(
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 
@@ -537,26 +559,31 @@ fn setup_endpoints_and_pair_ports(
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // finally learned the peer of this port!
 
                                    extra_port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    port_info
 
                                        .map
 
                                        .get_mut(&net_todo.endpoint_setup.getter_for_incoming)
 
                                        .unwrap()
 
                                        .peer = Some(peer_info.port);
 
                                    // learned the info of this peer port
 
                                    if !port_info.map.contains_key(&peer_info.port) {
 
                                        let info = PortInfo {
 
                                    port_info.map.entry(peer_info.port).or_insert({
 
                                        port_info
 
                                            .owned
 
                                            .entry(peer_info.owner)
 
                                            .or_default()
 
                                            .insert(peer_info.port);
 
                                        PortInfo {
 
                                            peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                            polarity: peer_info.polarity,
 
                                            owner: peer_info.owner,
 
                                            route: Route::NetEndpoint { index },
 
                                        };
 
                                        extra_port_info.info.insert(peer_info.port, info);
 
                                    }
 
                                        }
 
                                    });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
@@ -622,13 +649,13 @@ fn setup_endpoints_and_pair_ports(
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        io_byte_buffer,
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
    Ok(endpoint_manager)
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
@@ -1012,12 +1039,13 @@ fn apply_my_optimizations(
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // simply overwrite the contents
 
    // println!("BEFORE: {:#?}\n{:#?}", cu, comm);
 
    cu.ips.port_info = port_info;
 
    assert!(cu.ips.port_info.invariant_preserved());
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
@@ -1025,8 +1053,9 @@ fn apply_my_optimizations(
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    // println!("AFTER: {:#?}\n{:#?}", cu, comm);
 
    Ok(())
 
}
0 comments (0 inline, 0 general)