Changeset - e4a68057bfbe
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-06-24 18:05:59
christopher.esterhuyse@gmail.com
waker impl for setup phase connect. TODO test on Linux
4 files changed with 98 insertions and 31 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -15,12 +15,13 @@ derive_more = "0.99.2"
 
# runtime
 
bincode = "1.2.1"
 
serde = { version = "1.0.112", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
take_mut = "0.2.2"
 
indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 
replace_with = "0.1.5"
 

	
 
# network
 
integer-encoding = "1.0.7"
 
byteorder = "1.3.2"
 
# mio = "0.6.21"
 
mio-extras = "2.0.6"
src/runtime/endpoints.rs
Show inline comments
 
@@ -9,16 +9,12 @@ enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 

	
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl Endpoint {
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
src/runtime/mod.rs
Show inline comments
 
@@ -199,12 +199,15 @@ pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
src/runtime/setup.rs
Show inline comments
 
@@ -94,30 +94,31 @@ impl Connector {
 
                self.phased = ConnectorPhased::Communication(comm);
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Accepting(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
@@ -129,19 +130,33 @@ fn new_endpoint_manager(
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    };
 
    struct WakerState {
 
        continue_signal: Arc<AtomicBool>,
 
        failed_indices: HashSet<usize>,
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
@@ -154,25 +169,49 @@ fn new_endpoint_manager(
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
                            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                            log!(
 
                                logger,
 
@@ -180,82 +219,106 @@ fn new_endpoint_manager(
 
                                index,
 
                                peer_addr
 
                            );
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                    Err(e) if e.kind() == WouldBlock => {}
 
                    Err(_) => return Err(AcceptFailed(listener.local_addr().unwrap())),
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    waker.wake().expect("unable to wake");
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        }
 
                        continue;
 
                    }
 
                    if connect_failed.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                    ..
 
                } => {
 
                    if !setup_incomplete.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
 
                    if event.is_writable() && !*sent_local_port {
 
                    let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                    if event.is_writable() && !todo.sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: *local_port,
 
                            port: todo.local_port,
 
                        }));
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                        todo.sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        *local_port,
 
                                        todo.local_port,
 
                                    ));
 
                                }
 
                                *recv_peer_port = Some(peer_info.port);
 
                                todo.recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(*local_port, peer_info.port);
 
                                port_info.peers.insert(todo.local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, *local_port);
 
                                port_info.peers.insert(peer_info.port, todo.local_port);
 
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
                                log!(
 
                                    logger,
 
                                    "delaying msg {:?} during channel setup phase",
 
                                    inappropriate_msg
 
                                );
 
                                delayed_messages.push((index, inappropriate_msg));
 
                            }
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                    if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
@@ -265,17 +328,21 @@ fn new_endpoint_manager(
 
                TodoEndpoint::Endpoint(mut endpoint) => {
 
                    poll.registry()
 
                        .reregister(&mut endpoint.stream, Token(index), Interest::READABLE)
 
                        .unwrap();
 
                    endpoint
 
                }
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    if let Some(wcs) = waker_continue_signal {
 
        log!(logger, "Sending waker the stop signal");
 
        wcs.store(false, std::sync::atomic::Ordering::SeqCst);
 
    }
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
0 comments (0 inline, 0 general)