Changeset - e4a68057bfbe
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-06-24 18:05:59
christopher.esterhuyse@gmail.com
waker impl for setup phase connect. TODO test on Linux
4 files changed with 98 insertions and 31 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -18,6 +18,7 @@ serde = { version = "1.0.112", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
take_mut = "0.2.2"
 
indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 
replace_with = "0.1.5"
 

	
 
# network
 
integer-encoding = "1.0.7"
src/runtime/endpoints.rs
Show inline comments
 
@@ -12,10 +12,6 @@ enum TryRecyAnyError {
 
}
 

	
 
/////////////////////
 

	
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl Endpoint {
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
src/runtime/mod.rs
Show inline comments
 
@@ -202,6 +202,9 @@ pub struct SyncProtoContext<'a> {
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
src/runtime/setup.rs
Show inline comments
 
@@ -97,7 +97,6 @@ impl Connector {
 
        }
 
    }
 
}
 

	
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
@@ -105,16 +104,18 @@ fn new_endpoint_manager(
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Accepting(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init_todo(
 
@@ -132,13 +133,27 @@ fn new_endpoint_manager(
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    };
 
    struct WakerState {
 
        continue_signal: Arc<AtomicBool>,
 
        failed_indices: HashSet<usize>,
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
@@ -157,6 +172,7 @@ fn new_endpoint_manager(
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
@@ -169,7 +185,30 @@ fn new_endpoint_manager(
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
@@ -183,26 +222,51 @@ fn new_endpoint_manager(
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                    Err(e) if e.kind() == WouldBlock => {}
 
                    Err(_) => return Err(AcceptFailed(listener.local_addr().unwrap())),
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    waker.wake().expect("unable to wake");
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        }
 
                        continue;
 
                    }
 
                    if connect_failed.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                    ..
 
                } => {
 
                    if !setup_incomplete.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
 
                    if event.is_writable() && !*sent_local_port {
 
                    let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                    if event.is_writable() && !todo.sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: *local_port,
 
                            port: todo.local_port,
 
                        }));
 
                        endpoint
 
                            .send(&msg)
 
@@ -211,9 +275,9 @@ fn new_endpoint_manager(
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                        todo.sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
@@ -226,15 +290,15 @@ fn new_endpoint_manager(
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        *local_port,
 
                                        todo.local_port,
 
                                    ));
 
                                }
 
                                *recv_peer_port = Some(peer_info.port);
 
                                todo.recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(*local_port, peer_info.port);
 
                                port_info.peers.insert(todo.local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, *local_port);
 
                                port_info.peers.insert(peer_info.port, todo.local_port);
 
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
@@ -247,12 +311,11 @@ fn new_endpoint_manager(
 
                            }
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                    if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        events.clear();
 
@@ -268,11 +331,15 @@ fn new_endpoint_manager(
 
                        .unwrap();
 
                    endpoint
 
                }
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    if let Some(wcs) = waker_continue_signal {
 
        log!(logger, "Sending waker the stop signal");
 
        wcs.store(false, std::sync::atomic::Ordering::SeqCst);
 
    }
 
    Ok(EndpointManager {
 
        poll,
 
        events,
0 comments (0 inline, 0 general)