Changeset - 1a61d3b3d964
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-07-08 14:53:44
christopher.esterhuyse@gmail.com
setup procedure refactor and cleanup
1 file changed with 32 insertions and 27 deletions:
0 comments (0 inline, 0 general)
src/runtime/setup.rs
Show inline comments
 
@@ -166,12 +166,11 @@ fn new_endpoint_manager(
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4);
 
    let mut net_polled_undrained = VecSet::default();
 
    let udp_polled_undrained = VecSet::default();
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = net_endpoint_setups
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
@@ -218,10 +217,13 @@ fn new_endpoint_manager(
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    // TODO register udps, and all them to incomplete list
 

	
 
    let mut setup_incomplete: HashSet<TokenTarget> = (0..todos.len())
 
        .map(|index| TokenTarget::NetEndpoint { index })
 
        .chain((0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index }))
 
        .collect();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
@@ -245,17 +247,17 @@ fn new_endpoint_manager(
 
                    );
 
                    assert!(waker_state.is_some());
 
                    for net_index in connect_failed.drain() {
 
                        let todo: &mut Todo = &mut todos[net_index];
 
                        let net_todo = &mut net_todos[net_index];
 
                        log!(
 
                            logger,
 
                            "Restarting connection with endpoint {:?} {:?}",
 
                            net_index,
 
                            todo.endpoint_setup.sock_addr
 
                            net_todo.endpoint_setup.sock_addr
 
                        );
 
                        match &mut todo.todo_endpoint {
 
                        match &mut net_todo.todo_endpoint {
 
                            TodoEndpoint::NetEndpoint(endpoint) => {
 
                                let mut new_stream =
 
                                    TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                    TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                                        .expect("mio::TcpStream connect should not fail!");
 
                                std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                                let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
@@ -275,8 +277,8 @@ fn new_endpoint_manager(
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    let net_todo = &mut net_todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // FIRST try complete this connection
 
                        match listener.accept() {
 
                            Err(e) if would_block(&e) => {
 
@@ -298,13 +300,15 @@ fn new_endpoint_manager(
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                                net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut todo.todo_endpoint {
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint {
 
                        if event.is_error() {
 
                            if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // right now you cannot retry an acceptor. return failure
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
@@ -352,12 +356,13 @@ fn new_endpoint_manager(
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                        if event.is_writable() && !todo.sent_local_port {
 
                        let local_polarity =
 
                            *port_info.polarities.get(&net_todo.local_port).unwrap();
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                port: todo.local_port,
 
                                port: net_todo.local_port,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
@@ -369,9 +374,9 @@ fn new_endpoint_manager(
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            todo.sent_local_port = true;
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
@@ -393,15 +398,15 @@ fn new_endpoint_manager(
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            todo.local_port,
 
                                            net_todo.local_port,
 
                                        ));
 
                                    }
 
                                    todo.recv_peer_port = Some(peer_info.port);
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(todo.local_port, peer_info.port);
 
                                    port_info.peers.insert(net_todo.local_port, peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(peer_info.port, todo.local_port);
 
                                    port_info.peers.insert(peer_info.port, net_todo.local_port);
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
@@ -427,7 +432,7 @@ fn new_endpoint_manager(
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
@@ -445,7 +450,7 @@ fn new_endpoint_manager(
 
        // TODO leave the waker registered?
 
    }
 

	
 
    let net_endpoint_exts = todos
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {
0 comments (0 inline, 0 general)