Changeset - 5ed4a5524548
[Not reviewed]
0 3 2
Christopher Esterhuyse - 5 years ago 2020-10-02 20:54:27
christopher.esterhuyse@gmail.com
Goodbye, Nagle
5 files changed with 138 insertions and 45 deletions:
0 comments (0 inline, 0 general)
examples/bench_17/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
#define N 5
 
int main(int argc, char** argv) {
 
	int i, cid, min_pid, msgs;
 
	int i, j, cid, min_putter, min_getter, ports_tot, ports_used;
 
	char do_puts, do_gets;
 
	cid = atoi(argv[1]);
 
	min_pid = atoi(argv[2]);
 
	char role = argv[3][0]; // 'h' for head, 'i' for inner, 't' for tail, 's' for singleton
 
	msgs = atoi(argv[4]);
 
	printf("cid %d, min_pid %d, role='%c', msgs %d\n",
 
		cid, min_pid, role, msgs);
 
	min_putter = atoi(argv[2]);
 
	min_getter = atoi(argv[3]);
 
	ports_tot = atoi(argv[4]);
 
	ports_used = atoi(argv[5]);
 
	do_puts = argv[6][0]; // 't' or 'f'
 
	do_gets = argv[7][0];
 
	printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c\n",
 
		cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId putters[N], getters[N];
 
	FfiSocketAddr addr = {{127, 0, 0, 1}, 0};
 
	if(role=='i' || role=='t') {
 
		// I have N getter ports!
 
		for(i=0; i<N; i++) {
 
			addr.port = min_pid+i;
 
			connector_add_net_port(c, &getters[i], addr, Polarity_Getter, EndpointPolarity_Passive);
 
		}
 
	}
 
	if(role=='h' || role=='i') {
 
		// I have N putter ports!
 
		for(i=0; i<N; i++) {
 
			addr.port = min_pid+i+N;
 
			connector_add_net_port(c, &putters[i], addr, Polarity_Putter, EndpointPolarity_Active);
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	if(role=='i') {
 
		// Inner has a forwarder component to forward messages
 
		for(i=0; i<N; i++) {	
 
			connector_add_component(c, "forward", 7, (PortId[]){putters[i], getters[i]}, 2);
 
		}
 
	PortId putters[ports_tot], getters[ports_tot];
 
	for(i=0; i<ports_tot; i++) {
 
		connector_add_net_port(c, &putters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_putter+i},
 
			Polarity_Putter, EndpointPolarity_Active);
 
		connector_add_net_port(c, &getters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_getter+i},
 
			Polarity_Getter, EndpointPolarity_Passive);
 
	}
 
	connector_connect(c, -1);
 
	printf("connect ok!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<10000; i++) {
 
		if(role=='h' || role=='s') {
 
			// singleton and head send N messages
 
			for(i=0; i<N; i++) { 
 
				connector_put_bytes(c, putters[i], msg, sizeof(msg)-1);
 
			}
 
		}
 
		if(role=='t' || role=='s') {
 
			// singleton and tail recv N messages
 
			for(i=0; i<N; i++) { 
 
				connector_get(c, getters[i]);
 
			}
 
	for (i=0; i<1000; i++) {
 
		for(j=0; j<ports_used; j++) {
 
			if(do_gets=='y') connector_get(c, getters[j]);
 
			if(do_puts=='y') connector_put_bytes(c, putters[j], msg, sizeof(msg)-1);
 
		}
 
		// inner doesn't send nor receive
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_18/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	// all outward connections are ACTIVE to localhost
 
	// use tcp_rendezvous
 
	int i, j, cid, min_putter, min_getter, ports_tot, ports_used;
 
	char do_puts, do_gets;
 
	cid = atoi(argv[1]);
 
	min_putter = atoi(argv[2]);
 
	min_getter = atoi(argv[3]);
 
	ports_tot = atoi(argv[4]);
 
	ports_used = atoi(argv[5]);
 
	do_puts = argv[6][0]; // 't' or 'f'
 
	do_gets = argv[7][0];
 
	printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c\n",
 
		cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId putters[ports_tot], getters[ports_tot];
 
	for(i=0; i<ports_tot; i++) {
 
		connector_add_net_port(c, &putters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_putter+i},
 
			Polarity_Putter, EndpointPolarity_Active);
 
		connector_add_net_port(c, &getters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_getter+i},
 
			Polarity_Getter, EndpointPolarity_Active);
 
	}
 
	connector_connect(c, -1);
 
	printf("connect ok!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<1000; i++) {
 
		for(j=0; j<ports_used; j++) {
 
			if(do_gets=='y') connector_get(c, getters[j]);
 
			if(do_puts=='y') connector_put_bytes(c, putters[j], msg, sizeof(msg)-1);
 
		}
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_19/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	// bounce off tokyo to my public IP
 
	int i, j, cid, min_putter, min_getter, ports_tot, ports_used, n_rounds;
 
	char do_puts, do_gets;
 
	cid = atoi(argv[1]);
 
	min_putter = atoi(argv[2]);
 
	min_getter = atoi(argv[3]);
 
	ports_tot = atoi(argv[4]);
 
	ports_used = atoi(argv[5]);
 
	do_puts = argv[6][0]; // 't' or 'f'
 
	do_gets = argv[7][0];
 
	n_rounds = atoi(argv[12]);
 
	
 
	// argv 8..12 is PEER_IP
 
	
 
	printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c, n_rounds %d\n",
 
		cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets, n_rounds);
 
	printf("peer_ip %d.%d.%d.%d\n",
 
		atoi(argv[8]),
 
		atoi(argv[9]),
 
		atoi(argv[10]),
 
		atoi(argv[11]));
 
	
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId putters[ports_tot], getters[ports_tot];
 
	for(i=0; i<ports_tot; i++) {
 
		connector_add_net_port(c, &putters[i],
 
			(FfiSocketAddr){
 
				{atoi(argv[8]),atoi(argv[9]),atoi(argv[10]),atoi(argv[11]),},
 
				min_putter+i
 
			},
 
			Polarity_Putter, EndpointPolarity_Active);
 
		connector_add_net_port(c, &getters[i],
 
			(FfiSocketAddr){
 
				{0,0,0,0},
 
				min_getter+i
 
			},
 
			Polarity_Getter, EndpointPolarity_Passive);
 
	}
 
	connector_connect(c, -1);
 
	printf("connect ok!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<n_rounds; i++) {
 
		for(j=0; j<ports_used; j++) {
 
			if(do_gets=='y') connector_get(c, getters[j]);
 
			if(do_puts=='y') connector_put_bytes(c, putters[j], msg, sizeof(msg)-1);
 
		}
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/zoop.sh
Show inline comments
 
#!/bin/bash
 
for included in {0..13}
 
for ports in 2 4 6 8 10 12 14 16
 
do
 
	./bench_13/main.exe 65535 $included 13
 
done
 
\ No newline at end of file
 
	./bench_19/main.exe 98 7500 7000 16 $ports y y 192 168 1 4 1000
 
done
src/runtime/setup.rs
Show inline comments
 
@@ -305,384 +305,385 @@ fn setup_endpoints_and_pair_ports(
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::TcpInvalidConnect(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the timeout for the poll call
 
        let remaining = match (deadline, net_connect_to_retry.is_empty()) {
 
            (None, true) => None,
 
            (None, false) => Some(RETRY_PERIOD),
 
            (Some(deadline), is_empty) => {
 
                let dur_to_timeout =
 
                    deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?;
 
                Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) })
 
            }
 
        };
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
 
                match &mut net_todo.todo_endpoint {
 
                    NetTodoEndpoint::PeerInfoRecving(endpoint) => {
 
                        let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                            .expect("mio::TcpStream connect should not fail!");
 
                        std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                        let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                        poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        }
 
        for event in events.iter() {
 
            let token = event.token();
 
            // figure out which endpoint the event belonged to
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::UdpEndpoint { index } => {
 
                    // UdpEndpoints are easy to complete.
 
                    // Their setup event just has to succeed without error
 
                    if !setup_incomplete.contains(&token_target) {
 
                        // spurious wakeup. this endpoint has already been set up!
 
                        continue;
 
                    }
 
                    let udp_todo: &UdpTodo = &udp_todos[index];
 
                    if event.is_error() {
 
                        return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap()));
 
                    }
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    // NetEndpoints are complex to complete,
 
                    // they must accept/connect to their peer,
 
                    // and then exchange port info successfully
 
                    let net_todo = &mut net_todos[index];
 
                    if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // Passive endpoint that will first try accept the peer's connection
 
                        match listener.accept() {
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                            }
 
                            Ok((mut stream, peer_addr)) => {
 
                                // successfully accepted the active peer
 
                                // reusing the token, but now for the stream and not the listener
 
                                poll.registry().deregister(listener).unwrap();
 
                                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                                log!(
 
                                    logger,
 
                                    "Endpoint[{}] accepted a connection from {:?}",
 
                                    index,
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                net_todo.todo_endpoint =
 
                                    NetTodoEndpoint::PeerInfoRecving(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    // OK now let's try and finish exchanging port info
 
                    if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) =
 
                        &mut net_todo.todo_endpoint
 
                    {
 
                        if event.is_error() {
 
                            // event signals some error! :(
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // breaking as the acceptor is currently unrecoverable
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            // We will schedule it for a retry
 
                            net_connect_to_retry.insert(index);
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if net_connect_to_retry.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_info = port_info
 
                            .map
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .expect("Net Setup's getter port info isn't known"); // unreachable
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let _ = net_endpoint.stream.set_nodelay(true);
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                owner: local_info.owner,
 
                                polarity: local_info.polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't finish recving setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_info.polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // finally learned the peer of this port!
 
                                    extra_port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    // learned the info of this peer port
 
                                    if !port_info.map.contains_key(&peer_info.port) {
 
                                        let info = PortInfo {
 
                                            peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                            polarity: peer_info.polarity,
 
                                            owner: peer_info.owner,
 
                                            route: Route::NetEndpoint { index },
 
                                        };
 
                                        extra_port_info.info.insert(peer_info.port, info);
 
                                    }
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: endpoint_setup.getter_for_incoming,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_this_round: false,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    let endpoint_manager = EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 

	
 
    // storage structure for the state of a distributed wave
 
    // (for readability)
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
 
    // kick off a leader-election wave rooted at myself
 
    // given the desired wave information
 
    // (e.g. don't inform my parent if they exist)
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
        - Each connector initiates a wave tagged with their ID
 
        - Connectors participate in waves of GREATER ID, abandoning previous waves
 
        - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader
 
    2. Tree construction
 
        - The leader broadcasts their leadership with msg A
 
        - Upon receiving their first announcement, connectors reply B, and send A to all peers
 
        - A controller exits once they have received A or B from each neighbor
 

	
 
    The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1,
 
    so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun.
 

	
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
 
    if em.num_net_endpoints() == 0 {
 
        log!(logger, "Edge case of no neighbors! No parent an no children!");
 
        return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) });
 
    }
0 comments (0 inline, 0 general)