Changeset - 3801521d486d
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-07-23 11:30:09
christopher.esterhuyse@gmail.com
more pseudo-socket FFI. now correctly supporting recvfrom and sendto. testing on linux todo
7 files changed with 123 insertions and 94 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -38,11 +38,11 @@ lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/ffi/mod.rs
Show inline comments
 
use crate::{common::*, runtime::*};
 
use core::{cell::RefCell, convert::TryFrom};
 
use std::os::raw::c_int;
 
use std::slice::from_raw_parts as slice_from_raw_parts;
 

	
 
#[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
pub mod pseudo_socket_api;
 
// #[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
// pub mod pseudo_socket_api;
 

	
 
// Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages
 
#[repr(C)]
 
pub struct FfiSocketAddr {
 
    pub ipv4: [u8; 4],
 
    pub port: u16,
 
@@ -82,12 +82,13 @@ pub const WRONG_STATE: c_int = -2;
 
pub const CC_MAP_LOCK_POISONED: c_int = -3;
 
pub const CLOSE_FAIL: c_int = -4;
 
pub const BAD_FD: c_int = -5;
 
pub const CONNECT_FAILED: c_int = -6;
 
pub const WOULD_BLOCK: c_int = -7;
 
pub const BAD_SOCKADDR: c_int = -8;
 
pub const SEND_BEFORE_CONNECT: c_int = -9;
 

	
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
/// - data at pointer is null-delimited
src/ffi/pseudo_socket_api.rs
Show inline comments
 
use super::*;
 

	
 
use libc::{sockaddr, socklen_t};
 
use std::{
 
    collections::HashMap,
 
    ffi::c_void,
 
    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
 
    os::raw::c_int,
 
    sync::RwLock,
 
};
 
use libc::{sockaddr, socklen_t};
 
///////////////////////////////////////////////////////////////////
 

	
 
struct FdAllocator {
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
@@ -20,28 +20,28 @@ struct ConnectorBound {
 
    getter: PortId,
 
}
 
struct ConnectorComplex {
 
    // invariants:
 
    // 1. connector is a upd-socket singleton
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    peer_addr: SocketAddr,
 
    // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    conencted_to: Option<SocketAddr>,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CcMap {
 
    fd_to_cc: HashMap<c_int, RwLock<ConnectorComplex>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 
unsafe fn addr_from_raw(addr: *const sockaddr, addr_len: socklen_t) -> Option<SocketAddr> {
 
    os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr()
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
fn dummy_peer_addr() -> SocketAddr {
 
    // SocketAddrV4::new isn't a constant-time func
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 0), 8000))
 
}
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
@@ -64,31 +64,12 @@ impl FdAllocator {
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CC_MAP: RwLock<CcMap> = Default::default();
 
}
 
impl ConnectorComplex {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
        self.peer_addr = peer_addr;
 
        if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound {
 
            if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() {
 
                return CONNECT_FAILED;
 
            }
 
        }
 
        ERR_OK
 
    }
 
    unsafe fn send(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, putter, .. }) = &mut self.connector_bound {
 
            match connector_put_bytes(connector, *putter, bytes_ptr as _, bytes_len) {
 
                ERR_OK => connector_sync(connector, -1),
 
                err => err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
    unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
            connector_get(connector, *getter);
 
            match connector_sync(connector, -1) {
 
                0 => {
 
                    // batch index 0 means OK
 
@@ -105,58 +86,64 @@ impl ConnectorComplex {
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
}
 

	
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let cc = ConnectorComplex { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    let cc = ConnectorComplex { peer_addr: dummy_peer_addr(), connector_bound: None };
 
    w.fd_to_cc.insert(fd, RwLock::new(cc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED };
 
    if w.fd_to_cc.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if cc.connector_bound.is_some() {
 
        // already bound!
 
        return WRONG_STATE;
 
    }
 
    cc.connector_bound = {
 
        let mut connector = Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_mediator_component(addr, cc.peer_addr).unwrap();
 
        // maintain invariant: if cc.connected_to.is_some():
 
        //   cc.connected_to matches the connected address of the socket
 
        let peer_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr);
 
        let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
@@ -167,110 +154,157 @@ pub unsafe extern "C" fn rw_connect(
 
) -> c_int {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.connect(addr)
 
    if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound {
 
        // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise)
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(peer_addr).is_err() {
 
            return CONNECT_FAILED;
 
        }
 
    }
 
    cc.connected_to = Some(addr);
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.send(bytes_ptr, bytes_len)
 
    if cc.connected_to.is_none() {
 
        return SEND_BEFORE_CONNECT;
 
    }
 
    if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound {
 
        // is bound
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        bytes_len as isize
 
    } else {
 
        // is not bound
 
        WRONG_STATE as isize
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
pub unsafe extern "C" fn rw_sendto(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
) -> isize {
 
    // ignoring flags
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.recv(bytes_ptr, bytes_len)
 
    if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound {
 
        // is bound
 
        // (temporarily) break invariant
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() {
 
            // invariant not broken. nevermind
 
            return CONNECT_FAILED;
 
        }
 
        // invariant broken...
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        let old_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr)
 
        connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).unwrap();
 
        // ...invariant restored
 
        bytes_len as isize
 
    } else {
 
        // is not bound
 
        WRONG_STATE as isize
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_sendto(
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
) -> isize {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    // copy currently old_addr
 
    let old_addr = cc.peer_addr;
 
    // connect to given peer_addr
 
    match cc.connect(addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = cc.send(bytes_ptr, bytes_len);
 
    // restore old_addr
 
    match cc.connect(old_addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes);
 
        // return number of bytes sent
 
        cpy_msg_bytes as isize
 
    } else {
 
        WRONG_STATE as isize // not bound!
 
    }
 
    ret
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recvfrom(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
    addr: *mut sockaddr,
 
    addr_len: *mut socklen_t,
 
) -> isize {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    // copy currently old_addr
 
    let old_addr = cc.peer_addr;
 
    // connect to given peer_addr
 
    match cc.connect(addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = cc.send(bytes_ptr, bytes_len);
 
    // restore old_addr
 
    match cc.connect(old_addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        // overwrite addr and addr_len
 
        let addr = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap();
 
        let os_addr = os_socketaddr::OsSocketAddr::from(addr);
 
        let cpy_addr_bytes = (*addr_len).min(os_addr.capacity());
 
        // ptr-return addr bytes (truncated to addr_len)
 
        std::ptr::copy_nonoverlapping(os_addr.as_ptr(), addr as *mut u8, cpy_addr_bytes);
 
        // ptr-return true addr size
 
        *addr_len = os_addr.capacity(); 
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes);
 
        // return number of bytes received
 
        cpy_msg_bytes as isize
 
    } else {
 
        WRONG_STATE as isize // not bound!
 
    }
 
    ret
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -111,21 +111,14 @@ impl Connector {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // #[cfg(ffi_socket_api)]
 
    pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
        let sock = &mut self
 
            .get_comm_mut()?
 
            .endpoint_manager
 
            .udp_endpoint_store
 
            .endpoint_exts
 
            .get_mut(index)?
 
            .sock;
 
        Some(sock)
 
    pub(crate) fn get_mut_udp_ee(&mut self, index: usize) -> Option<&mut UdpEndpointExt> {
 
        self.get_comm_mut()?.endpoint_manager.udp_endpoint_store.endpoint_exts.get_mut(index)
 
    }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
src/runtime/endpoints.rs
Show inline comments
 
@@ -231,25 +231,25 @@ impl EndpointManager {
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                    if !ee.received_from_this_round.is_none() {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        round_ctx.getter_add(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                        ee.received_from_this_round = Some(from);
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
@@ -340,20 +340,21 @@ impl EndpointManager {
 
        log!(
 
            logger,
 
            "Starting round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() {
 
            ee.received_this_round = false;
 
            ee.received_from_this_round = None;
 
        }
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // retain received_from_this_round for use in pseudo_socket_api::recv_from
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
src/runtime/mod.rs
Show inline comments
 
@@ -37,12 +37,19 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                 // KEY in pred->branch map
 
}
 
#[derive(Debug)]
 
pub(crate) struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_from_this_round: Option<SocketAddr>,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
}
 
@@ -203,19 +210,12 @@ struct EndpointManager {
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -508,13 +508,13 @@ fn new_endpoint_manager(
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_this_round: false,
 
                received_from_this_round: None,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
0 comments (0 inline, 0 general)