Changeset - 162c3306c4af
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-07 11:03:34
christopher.esterhuyse@gmail.com
laying the groundwork for introducing udp endpoints alongside the existing reowolf-net (over tcp) endpoints
6 files changed with 309 insertions and 209 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -20,7 +20,7 @@ getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
# network
 
# integer-encoding = "1.1.5"
 
# byteorder = "1.3.4"
 
mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 

	
 
# protocol
 
backtrace = "0.3"
src/runtime/communication.rs
Show inline comments
 
@@ -74,12 +74,11 @@ impl Connector {
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        use NextBatchError as Nbe;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Nbe::NotConnected),
 
            ConnectorPhased::Setup { .. } => Err(WrongStateError),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
@@ -568,8 +567,8 @@ impl Connector {
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming;
 
                        let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index]
 
                            .getter_for_incoming;
 
                        assert!(cu.port_info.polarities.get(&getter) == Some(&Getter));
 
                        log!(
 
                            cu.logger,
src/runtime/endpoints.rs
Show inline comments
 
@@ -11,7 +11,7 @@ enum TryRecyAnyError {
 
    EndpointError { error: EndpointError, index: usize },
 
}
 
/////////////////////
 
impl Endpoint {
 
impl NetEndpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
@@ -70,10 +70,10 @@ impl Endpoint {
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
@@ -81,13 +81,13 @@ impl EndpointManager {
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index))
 
        let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index))
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub(super) fn try_recv_any_comms(
 
@@ -113,7 +113,7 @@ impl EndpointManager {
 
            Trae::Timeout => Ce::Timeout,
 
            Trae::PollFailed => Ce::PollFailed,
 
            Trae::EndpointError { error, index } => Ce::EndpointSetupError(
 
                self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(),
 
                self.net_endpoint_exts[index].net_endpoint.stream.local_addr().unwrap(),
 
                error,
 
            ),
 
        })
 
@@ -132,13 +132,13 @@ impl EndpointManager {
 
        loop {
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) = endpoint
 
                let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
                if let Some(msg) = net_endpoint
 
                    .try_recv(logger)
 
                    .map_err(|error| Trea::EndpointError { error, index })?
 
                {
 
                    endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                    if !endpoint.inbox.is_empty() {
 
                    if !net_endpoint.inbox.is_empty() {
 
                        // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    }
 
@@ -176,7 +176,7 @@ impl EndpointManager {
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
impl Debug for Endpoint {
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
src/runtime/error.rs
Show inline comments
 
@@ -61,14 +61,7 @@ pub enum GottenError {
 
    PreviousSyncFailed,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NewNetPortError {
 
    AlreadyConnected,
 
}
 
pub struct WrongStateError;
 
/////////////////////
 
impl From<UnrecoverableSyncError> for SyncError {
 
    fn from(e: UnrecoverableSyncError) -> Self {
src/runtime/mod.rs
Show inline comments
 
@@ -16,6 +16,7 @@ mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
#[derive(Debug)]
 
pub struct Connector {
 
@@ -132,7 +133,7 @@ enum AllMapperResult {
 
    New(Predicate),
 
    Nonexistant,
 
}
 
struct Endpoint {
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
@@ -142,13 +143,19 @@ struct ProtoComponent {
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct EndpointSetup {
 
struct NetEndpointSetup {
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 
#[derive(Debug)]
 
struct EndpointExt {
 
    endpoint: Endpoint,
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
@@ -177,7 +184,13 @@ struct EndpointManager {
 
    polled_undrained: VecSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
    net_endpoint_exts: Vec<NetEndpointExt>,
 
}
 
struct UdpEndpointExt {
 
    sock: UdpSocket,
 
    getter_for_incoming: PortId,
 
    outgoing_buffer: HashMap<Predicate, Payload>,
 
    incoming_buffer: Vec<Payload>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
@@ -203,8 +216,14 @@ struct ConnectorUnphased {
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>,
 
    udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>,
 
    surplus_sockets: u16,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
@@ -217,10 +236,40 @@ struct NativeBatch {
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
////////////////
 
pub fn would_block(err: &std::io::Error) -> bool {
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::MAX_INDEX {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::MAX_INDEX),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
src/runtime/setup.rs
Show inline comments
 
@@ -18,7 +18,25 @@ impl Connector {
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
                surplus_sockets,
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: _up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(_setup) => {
 
                let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr };
 
                todo!()
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
@@ -26,12 +44,12 @@ impl Connector {
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, NewNetPortError> {
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => Err(NewNetPortError::AlreadyConnected),
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
@@ -44,7 +62,7 @@ impl Connector {
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                setup.net_endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
        }
 
@@ -52,25 +70,25 @@ impl Connector {
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    endpoint_setups,
 
                    &setup.net_endpoint_setups,
 
                    &mut cu.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                    endpoint_manager.net_endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
@@ -99,7 +117,7 @@ impl Connector {
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
@@ -109,26 +127,26 @@ fn new_endpoint_manager(
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        endpoint_setup: NetEndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        Endpoint(Endpoint),
 
        NetEndpoint(NetEndpoint),
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        endpoint_setup: &NetEndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
            TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
@@ -146,12 +164,13 @@ fn new_endpoint_manager(
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = VecSet::default();
 
@@ -162,7 +181,12 @@ fn new_endpoint_manager(
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
            init_todo(
 
                TokenTarget::NetEndpoint { index }.into(),
 
                *local_port,
 
                endpoint_setup,
 
                &mut poll,
 
            )
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
@@ -184,192 +208,227 @@ fn new_endpoint_manager(
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            if token == WAKER_TOKEN {
 
                log!(
 
                    logger,
 
                    "Notification from waker. connect_failed is {:?}",
 
                    connect_failed.iter()
 
                );
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    let todo: &mut Todo = &mut todos[index];
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::Waker => {
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                        "Notification from waker. connect_failed is {:?}",
 
                        connect_failed.iter()
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry()
 
                                .register(&mut endpoint.stream, Token(index), BOTH)
 
                                .unwrap();
 
                    assert!(waker_state.is_some());
 
                    for net_endpoint_index in connect_failed.drain() {
 
                        let todo: &mut Todo = &mut todos[net_endpoint_index];
 
                        log!(
 
                            logger,
 
                            "Restarting connection with endpoint {:?} {:?}",
 
                            net_endpoint_index,
 
                            todo.endpoint_setup.sock_addr
 
                        );
 
                        match &mut todo.todo_endpoint {
 
                            TodoEndpoint::NetEndpoint(endpoint) => {
 
                                let mut new_stream =
 
                                    TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                        .expect("mio::TcpStream connect should not fail!");
 
                                std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                                let token =
 
                                    TokenTarget::NetEndpoint { index: net_endpoint_index }.into();
 
                                poll.registry()
 
                                    .register(&mut endpoint.stream, token, BOTH)
 
                                    .unwrap();
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                let todo: &mut Todo = &mut todos[index];
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
                            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                            log!(
 
                                logger,
 
                                "Endpoint[{}] accepted a connection from {:?}",
 
                                index,
 
                                peer_addr
 
                            );
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                TokenTarget::UdpEndpoint { index: _ } => unreachable!(),
 
                TokenTarget::NetEndpoint { index } => {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                        // FIRST try complete this connection
 
                        match listener.accept() {
 
                            Err(e) if would_block(&e) => {
 
                                log!(logger, "Spurious wakeup on listener {:?}", index)
 
                            }
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                            }
 
                            Ok((mut stream, peer_addr)) => {
 
                                // success!
 
                                poll.registry().deregister(listener).unwrap();
 
                                // reusing original token as-is
 
                                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                                log!(
 
                                    logger,
 
                                    "Endpoint[{}] accepted a connection from {:?}",
 
                                    index,
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut todo.todo_endpoint {
 
                        if event.is_error() {
 
                            if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                                // right now you cannot retry an acceptor. return failure
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            if connect_failed.insert(index) {
 
                                log!(
 
                                    logger,
 
                                    "Connection failed for {:?}. List is {:?}",
 
                                    index,
 
                                    connect_failed.iter()
 
                                );
 
                                poll.registry().deregister(&mut net_endpoint.stream).unwrap();
 
                            } else {
 
                                // spurious wakeup.
 
                                continue;
 
                            }
 
                            if waker_state.is_none() {
 
                                log!(logger, "First connect failure. Starting waker thread");
 
                                let arc = Arc::new(WakerState {
 
                                    waker: mio::Waker::new(
 
                                        poll.registry(),
 
                                        TokenTarget::Waker.into(),
 
                                    )
 
                                    .unwrap(),
 
                                    continue_signal: true.into(),
 
                                });
 
                                let moved_arc = arc.clone();
 
                                waker_state = Some(arc);
 
                                std::thread::spawn(move || {
 
                                    while moved_arc
 
                                        .continue_signal
 
                                        .load(std::sync::atomic::Ordering::SeqCst)
 
                                    {
 
                                        std::thread::sleep(WAKER_PERIOD);
 
                                        let _ = moved_arc.waker.wake();
 
                                    }
 
                                });
 
                            }
 
                            continue;
 
                        }
 
                        if connect_failed.insert(index) {
 
                            log!(
 
                                logger,
 
                                "Connection failed for {:?}. List is {:?}",
 
                                index,
 
                                connect_failed.iter()
 
                            );
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                        } else {
 
                        // event wasn't ERROR
 
                        if connect_failed.contains(&index) {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 

	
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    let _ = waker.wake();
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        if !setup_incomplete.contains(&index) {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        continue;
 
                    }
 
                    if connect_failed.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    if !setup_incomplete.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                    if event.is_writable() && !todo.sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: todo.local_port,
 
                        }));
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        todo.sent_local_port = true;
 
                    }
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                        if event.is_writable() && !todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                port: todo.local_port,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::EndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            todo.sent_local_port = true;
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        todo.local_port,
 
                                    ));
 
                        if event.is_readable() && todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e)
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            todo.local_port,
 
                                        ));
 
                                    }
 
                                    todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(todo.local_port, peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(peer_info.port, todo.local_port);
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
                                        .or_insert(Route::Endpoint { index });
 
                                }
 
                                todo.recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(todo.local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, todo.local_port);
 
                                if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                    // check just for logging purposes
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                        peer_info.port,
 
                                        route
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                                port_info
 
                                    .routes
 
                                    .entry(peer_info.port)
 
                                    .or_insert(Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
                                log!(
 
                                    logger,
 
                                    "delaying msg {:?} during channel setup phase",
 
                                    inappropriate_msg
 
                                );
 
                                delayed_messages.push((index, inappropriate_msg));
 
                            }
 
                        }
 
                    }
 
                    if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                        // is the setup for this net_endpoint now complete?
 
                        if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&index);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    let endpoint_exts = todos
 
    // all todos must be the NetEndpoint variants! unwrap and collect them
 
    let net_endpoint_exts = todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(mut endpoint) => {
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut endpoint.stream, Token(index), Interest::READABLE)
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    endpoint
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    if let Some(wcs) = waker_continue_signal {
 
    if let Some(arc) = waker_state {
 
        log!(logger, "Sending waker the stop signal");
 
        wcs.store(false, std::sync::atomic::Ordering::SeqCst);
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 
    Ok(EndpointManager {
 
        poll,
 
@@ -377,7 +436,7 @@ fn new_endpoint_manager(
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        endpoint_exts,
 
        net_endpoint_exts,
 
    })
 
}
 

	
 
@@ -427,12 +486,12 @@ fn init_neighborhood(
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
 
    if em.num_endpoints() == 0 {
 
    if em.num_net_endpoints() == 0 {
 
        log!(logger, "Edge case of no neighbors! No parent an no children!");
 
        return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) });
 
    }
 
    log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints());
 
    let mut awaiting = HashSet::with_capacity(em.num_endpoints());
 
    log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints());
 
    let mut awaiting = HashSet::with_capacity(em.num_net_endpoints());
 
    // 1+ neighbors. Leader can only be learned by receiving messages
 
    // loop ends when I know my sink tree parent (implies leader was elected)
 
    let election_result: WaveState = {
 
@@ -650,7 +709,7 @@ fn session_optimize(
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .endpoint_exts
 
            .net_endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
@@ -740,7 +799,7 @@ fn apply_optimizations(
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in
 
        comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter)
 
        comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
0 comments (0 inline, 0 general)