Changeset - cf50a74eafd7
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-08 14:46:27
christopher.esterhuyse@gmail.com
setup of udp endpoints partially done
5 files changed with 150 insertions and 125 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -534,13 +534,14 @@ impl Connector {
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm
 
                    .endpoint_manager
 
                    .try_recv_any_comms(&mut *cu.logger, &cu.port_info, rctx, comm.round_index)?
 
                {
 
                    CommRecvOk::NewControlMsg { net_endpoint_index, msg } => {
 
                        (net_endpoint_index, msg)
 
                    }
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) =
 
                    match comm.endpoint_manager.try_recv_any_comms(
 
                        &mut *cu.logger,
 
                        &cu.port_info,
 
                        rctx,
 
                        comm.round_index,
 
                    )? {
 
                        CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg),
 
                        CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                        CommRecvOk::TimeoutWithoutNew => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
@@ -561,19 +562,18 @@ impl Connector {
 
                log!(
 
                    cu.logger,
 
                    "Received from endpoint {} ctrl msg  {:?}",
 
                    endpoint_index,
 
                    net_index,
 
                    &comm_ctrl_msg
 
                );
 
                match comm_ctrl_msg {
 
                    CommCtrlMsg::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&endpoint_index) {
 
                        if comm.neighborhood.children.contains(&net_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let subtree_id =
 
                                        SubtreeId::NetEndpoint { index: endpoint_index };
 
                                    let subtree_id = SubtreeId::NetEndpoint { index: net_index };
 
                                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        subtree_id,
 
@@ -602,12 +602,12 @@ impl Connector {
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                endpoint_index
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                    CommCtrlMsg::Announce { decision } => {
 
                        if Some(endpoint_index) == comm.neighborhood.parent {
 
                        if Some(net_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            return Ok(decision);
 
                        } else {
 
@@ -615,7 +615,7 @@ impl Connector {
 
                                cu.logger,
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                endpoint_index
 
                                net_index
 
                            );
 
                        }
 
                    }
 
@@ -684,9 +684,9 @@ impl BranchingNative {
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use AllMapperResult as Amr;
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
            use AssignmentUnionResult as Aur;
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
@@ -695,13 +695,13 @@ impl BranchingNative {
 
                    );
 
                    finished.insert(predicate, branch);
 
                }
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Amr::LatterNotFormer => {
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
@@ -715,7 +715,7 @@ impl BranchingNative {
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Amr::New(predicate2) => {
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
@@ -865,21 +865,21 @@ impl BranchingProtoComponent {
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use AllMapperResult as Amr;
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Amr::LatterNotFormer => {
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
@@ -888,7 +888,7 @@ impl BranchingProtoComponent {
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Amr::New(predicate2) => {
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
src/runtime/endpoints.rs
Show inline comments
 
@@ -158,7 +158,7 @@ impl EndpointManager {
 
                &mut self,
 
                logger: &mut dyn Logger,
 
                round_ctx: &mut impl RoundCtxTrait,
 
                net_endpoint_index: usize,
 
                net_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
                some_message_enqueued: &mut bool,
 
@@ -183,19 +183,16 @@ impl EndpointManager {
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            self.delayed_messages
 
                                .push((net_endpoint_index, Msg::CommMsg(comm_msg)));
 
                            self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg)));
 
                            return None;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => {
 
                        Some((net_endpoint_index, comm_ctrl_msg))
 
                    }
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter = self.net_endpoint_store.endpoint_exts[net_endpoint_index]
 
                            .getter_for_incoming;
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        round_ctx.getter_add(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
@@ -207,30 +204,30 @@ impl EndpointManager {
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_endpoint_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_endpoint_index, msg)) = self.handle_msg(
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) = self.handle_msg(
 
                logger,
 
                round_ctx,
 
                net_endpoint_index,
 
                net_index,
 
                msg,
 
                round_index,
 
                &mut some_message_enqueued,
 
            ) {
 
                return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg });
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            while let Some((net_endpoint_index, msg)) = self.try_recv_undrained_net(logger)? {
 
                if let Some((net_endpoint_index, msg)) = self.handle_msg(
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    logger,
 
                    round_ctx,
 
                    net_endpoint_index,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg });
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
src/runtime/mod.rs
Show inline comments
 
@@ -136,7 +136,7 @@ struct SendPayloadMsg {
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum AllMapperResult {
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
@@ -258,6 +258,7 @@ struct NativeBatch {
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
@@ -270,7 +271,7 @@ trait RoundCtxTrait {
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_endpoint_index: usize, msg: CommCtrlMsg },
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn would_block(err: &std::io::Error) -> bool {
 
@@ -456,34 +457,31 @@ impl Predicate {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
                Some(val2) if val2 == val => {}
 
                _ => return false,
 
                _ => return false, // var unmapped, or mapped differently
 
            }
 
        }
 
        true
 
    }
 

	
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn consistent_with(&self, other: &Self) -> bool {
 
        let [larger, smaller] =
 
            if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 
    // pub fn consistent_with(&self, other: &Self) -> bool {
 
    //     let [larger, smaller] =
 
    //         if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
        for (var, val) in smaller.assigned.iter() {
 
            match larger.assigned.get(var) {
 
                Some(val2) if val2 != val => return false,
 
                _ => {}
 
            }
 
        }
 
        true
 
    }
 
    //     for (var, val) in smaller.assigned.iter() {
 
    //         match larger.assigned.get(var) {
 
    //             Some(val2) if val2 != val => return false,
 
    //             _ => {}
 
    //         }
 
    //     }
 
    //     true
 
    // }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    fn all_mapper(&self, other: &Self) -> AllMapperResult {
 
        use AllMapperResult as Amr;
 
    /// Given self and other, two predicates, return the predicate whose
 
    /// assignments are the union of those of self and other.
 
    ///
 
    fn assignment_union(&self, other: &Self) -> AssignmentUnionResult {
 
        use AssignmentUnionResult as Aur;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
@@ -514,7 +512,7 @@ impl Predicate {
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Amr::Nonexistant;
 
                        return Aur::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
@@ -525,9 +523,9 @@ impl Predicate {
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Amr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Amr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Amr::LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Aur::Equivalent,       // ... equivalent to both.
 
            [false, true] => Aur::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Aur::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
@@ -535,7 +533,7 @@ impl Predicate {
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Amr::New(new)
 
                Aur::New(new)
 
            }
 
        }
 
    }
src/runtime/setup.rs
Show inline comments
 
@@ -94,6 +94,7 @@ impl Connector {
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.port_info,
 
                    &deadline,
 
                )?;
 
@@ -129,7 +130,8 @@ impl Connector {
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    net_endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    udp_endpoint_setups: &[(PortId, UdpEndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
@@ -144,35 +146,14 @@ fn new_endpoint_manager(
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    struct UdpTodo {
 
        local_port: PortId,
 
        sock: UdpSocket,
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        NetEndpoint(NetEndpoint),
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &NetEndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
@@ -184,24 +165,49 @@ fn new_endpoint_manager(
 

	
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut events = Events::with_capacity(net_endpoint_setups.len() * 2 + 4);
 
    let mut net_polled_undrained = VecSet::default();
 
    let udp_polled_undrained = VecSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
    let mut todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(
 
                TokenTarget::NetEndpoint { index }.into(),
 
                *local_port,
 
                endpoint_setup,
 
                &mut poll,
 
            )
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                TodoEndpoint::Accepting(listener)
 
            };
 
            Ok(Todo {
 
                todo_endpoint,
 
                local_port: *local_port,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, local_port: *local_port })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
@@ -210,8 +216,12 @@ fn new_endpoint_manager(
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 
    // TODO register udps, and all them to incomplete list
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    let mut setup_incomplete: HashSet<TokenTarget> = (0..todos.len())
 
        .map(|index| TokenTarget::NetEndpoint { index })
 
        .chain((0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index }))
 
        .collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
@@ -222,6 +232,10 @@ fn new_endpoint_manager(
 
        for event in events.iter() {
 
            let token = event.token();
 
            let token_target = TokenTarget::from(token);
 
            if !setup_incomplete.contains(&token_target) {
 
                // spurious wakeup
 
                continue;
 
            }
 
            match token_target {
 
                TokenTarget::Waker => {
 
                    log!(
 
@@ -230,12 +244,12 @@ fn new_endpoint_manager(
 
                        connect_failed.iter()
 
                    );
 
                    assert!(waker_state.is_some());
 
                    for net_endpoint_index in connect_failed.drain() {
 
                        let todo: &mut Todo = &mut todos[net_endpoint_index];
 
                    for net_index in connect_failed.drain() {
 
                        let todo: &mut Todo = &mut todos[net_index];
 
                        log!(
 
                            logger,
 
                            "Restarting connection with endpoint {:?} {:?}",
 
                            net_endpoint_index,
 
                            net_index,
 
                            todo.endpoint_setup.sock_addr
 
                        );
 
                        match &mut todo.todo_endpoint {
 
@@ -244,8 +258,7 @@ fn new_endpoint_manager(
 
                                    TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                        .expect("mio::TcpStream connect should not fail!");
 
                                std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                                let token =
 
                                    TokenTarget::NetEndpoint { index: net_endpoint_index }.into();
 
                                let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                                poll.registry()
 
                                    .register(&mut endpoint.stream, token, BOTH)
 
                                    .unwrap();
 
@@ -254,7 +267,13 @@ fn new_endpoint_manager(
 
                        }
 
                    }
 
                }
 
                TokenTarget::UdpEndpoint { index: _ } => unreachable!(),
 
                TokenTarget::UdpEndpoint { index } => {
 
                    let udp_todo: &UdpTodo = &udp_todos[index];
 
                    if event.is_error() {
 
                        return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap()));
 
                    }
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
@@ -333,10 +352,6 @@ fn new_endpoint_manager(
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&index) {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 
                        let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                        if event.is_writable() && !todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
@@ -414,7 +429,7 @@ fn new_endpoint_manager(
 
                        // is the setup for this net_endpoint now complete?
 
                        if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&index);
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
@@ -429,7 +444,6 @@ fn new_endpoint_manager(
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 
    let udp_endpoint_exts = vec![];
 

	
 
    let net_endpoint_exts = todos
 
        .into_iter()
 
@@ -448,6 +462,22 @@ fn new_endpoint_manager(
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, local_port } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                incoming_round_spec_var: None,
 
                getter_for_incoming: local_port,
 
                incoming_payloads: Default::default(),
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
src/runtime/tests.rs
Show inline comments
 
@@ -658,12 +658,12 @@ fn multi_recover() {
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 
// #[test]
 
// fn udp_self_connect() {
 
//     let test_log_path = Path::new("./logs/udp_self_connect");
 
//     let sock_addrs = [next_test_addr(), next_test_addr()];
 
//     let mut c = file_logged_connector(0, test_log_path);
 
//     c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
//     c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
//     c.connect(SEC1).unwrap();
 
// }
0 comments (0 inline, 0 general)