diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 3a45844f1cdcf75dcb342fd0192e990713a05e3f..b3cf1418b1f02f71110d7a05086d21bf5e47f9b2 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -166,12 +166,11 @@ fn new_endpoint_manager( let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4); - let mut net_polled_undrained = VecSet::default(); - let udp_polled_undrained = VecSet::default(); + let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()]; let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively - let mut todos = net_endpoint_setups + let mut net_todos = net_endpoint_setups .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { @@ -218,10 +217,13 @@ fn new_endpoint_manager( let mut connect_failed: HashSet = Default::default(); // TODO register udps, and all them to incomplete list - let mut setup_incomplete: HashSet = (0..todos.len()) - .map(|index| TokenTarget::NetEndpoint { index }) - .chain((0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index })) - .collect(); + let mut setup_incomplete: HashSet = { + let net_todo_targets_iter = + (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index }); + let udp_todo_targets_iter = + (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index }); + net_todo_targets_iter.chain(udp_todo_targets_iter).collect() + }; while !setup_incomplete.is_empty() { let remaining = if let Some(deadline) = deadline { Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) @@ -245,17 +247,17 @@ fn new_endpoint_manager( ); assert!(waker_state.is_some()); for net_index in connect_failed.drain() { - let todo: &mut Todo = &mut todos[net_index]; + let net_todo = &mut net_todos[net_index]; log!( logger, "Restarting connection with endpoint {:?} {:?}", net_index, - todo.endpoint_setup.sock_addr + net_todo.endpoint_setup.sock_addr ); - match &mut todo.todo_endpoint { + match &mut net_todo.todo_endpoint { TodoEndpoint::NetEndpoint(endpoint) => { let mut new_stream = - TcpStream::connect(todo.endpoint_setup.sock_addr) + TcpStream::connect(net_todo.endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); std::mem::swap(&mut endpoint.stream, &mut new_stream); let token = TokenTarget::NetEndpoint { index: net_index }.into(); @@ -275,8 +277,8 @@ fn new_endpoint_manager( setup_incomplete.remove(&token_target); } TokenTarget::NetEndpoint { index } => { - let todo: &mut Todo = &mut todos[index]; - if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { + let net_todo = &mut net_todos[index]; + if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { // FIRST try complete this connection match listener.accept() { Err(e) if would_block(&e) => { @@ -298,13 +300,15 @@ fn new_endpoint_manager( peer_addr ); let net_endpoint = NetEndpoint { stream, inbox: vec![] }; - todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); + net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); } } } - if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut todo.todo_endpoint { + if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint { if event.is_error() { - if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { + if net_todo.endpoint_setup.endpoint_polarity + == EndpointPolarity::Passive + { // right now you cannot retry an acceptor. return failure return Err(Ce::AcceptFailed( net_endpoint.stream.local_addr().unwrap(), @@ -352,12 +356,13 @@ fn new_endpoint_manager( // spurious wakeup continue; } - let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); - if event.is_writable() && !todo.sent_local_port { + let local_polarity = + *port_info.polarities.get(&net_todo.local_port).unwrap(); + if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { polarity: local_polarity, - port: todo.local_port, + port: net_todo.local_port, })); net_endpoint .send(&msg) @@ -369,9 +374,9 @@ fn new_endpoint_manager( }) .unwrap(); log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); - todo.sent_local_port = true; + net_todo.sent_local_port = true; } - if event.is_readable() && todo.recv_peer_port.is_none() { + if event.is_readable() && net_todo.recv_peer_port.is_none() { // can read and didn't recv setup msg yet? Do so! let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { Ce::NetEndpointSetupError( @@ -393,15 +398,15 @@ fn new_endpoint_manager( ); if peer_info.polarity == local_polarity { return Err(ConnectError::PortPeerPolarityMismatch( - todo.local_port, + net_todo.local_port, )); } - todo.recv_peer_port = Some(peer_info.port); + net_todo.recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! - port_info.peers.insert(todo.local_port, peer_info.port); + port_info.peers.insert(net_todo.local_port, peer_info.port); // 2. learned the info of this peer port port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert(peer_info.port, todo.local_port); + port_info.peers.insert(peer_info.port, net_todo.local_port); if let Some(route) = port_info.routes.get(&peer_info.port) { // check just for logging purposes log!( @@ -427,7 +432,7 @@ fn new_endpoint_manager( } } // is the setup for this net_endpoint now complete? - if todo.sent_local_port && todo.recv_peer_port.is_some() { + if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() { // yes! connected, sent my info and received peer's info setup_incomplete.remove(&token_target); log!(logger, "endpoint[{}] is finished!", index); @@ -445,7 +450,7 @@ fn new_endpoint_manager( // TODO leave the waker registered? } - let net_endpoint_exts = todos + let net_endpoint_exts = net_todos .into_iter() .enumerate() .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {