diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index d60b009fa551fa6de8fa368c500e9913bfb2f231..f02f2f9bed2a771b3a1b607c357cb6337de81d1d 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -146,7 +146,7 @@ fn new_endpoint_manager( // 1. Start to construct EndpointManager const WAKER_TOKEN: Token = Token(usize::MAX); - const WAKER_PERIOD: Duration = Duration::from_millis(90); + const WAKER_PERIOD: Duration = Duration::from_millis(300); assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token @@ -169,7 +169,10 @@ fn new_endpoint_manager( // - accept an incoming connection for each TcpListener (turning them into endpoints too) // - for each endpoint, send the local PortId // - for each endpoint, recv the peer's PortId, and + + // all in connect_failed are NOT registered with Poll let mut connect_failed: HashSet = Default::default(); + let mut setup_incomplete: HashSet = (0..todos.len()).collect(); while !setup_incomplete.is_empty() { let remaining = if let Some(deadline) = deadline { @@ -181,11 +184,15 @@ fn new_endpoint_manager( for event in events.iter() { let token = event.token(); let Token(index) = token; - let todo: &mut Todo = &mut todos[index]; if token == WAKER_TOKEN { - log!(logger, "Notification from waker"); + log!( + logger, + "Notification from waker. connect_failed is {:?}", + connect_failed.iter() + ); assert!(waker_continue_signal.is_some()); for index in connect_failed.drain() { + let todo: &mut Todo = &mut todos[index]; log!( logger, "Restarting connection with endpoint {:?} {:?}", @@ -196,14 +203,16 @@ fn new_endpoint_manager( TodoEndpoint::Endpoint(endpoint) => { let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); - poll.registry().deregister(&mut endpoint.stream).unwrap(); std::mem::swap(&mut endpoint.stream, &mut new_stream); - poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap(); + poll.registry() + .register(&mut endpoint.stream, Token(index), BOTH) + .unwrap(); } _ => unreachable!(), } } } else { + let todo: &mut Todo = &mut todos[index]; // FIRST try convert this into an endpoint if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { match listener.accept() { @@ -234,7 +243,19 @@ fn new_endpoint_manager( // right now you cannot retry an acceptor. return Err(AcceptFailed(endpoint.stream.local_addr().unwrap())); } - connect_failed.insert(index); + if connect_failed.insert(index) { + log!( + logger, + "Connection failed for {:?}. List is {:?}", + index, + connect_failed.iter() + ); + poll.registry().deregister(&mut endpoint.stream).unwrap(); + } else { + // spurious wakeup + continue; + } + if waker_continue_signal.is_none() { log!(logger, "First connect failure. Starting waker thread"); let waker =