Changeset - 3028e66928d3
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-07-23 12:01:20
christopher.esterhuyse@gmail.com
ffi corrections after testing
4 files changed with 36 insertions and 57 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -41,7 +41,7 @@ lazy_static = "1.4.0"
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs
 
endpoint_logging = [] # see src/macros.rs
src/ffi/mod.rs
Show inline comments
 
@@ -3,8 +3,8 @@ use core::{cell::RefCell, convert::TryFrom};
 
use std::os::raw::c_int;
 
use std::slice::from_raw_parts as slice_from_raw_parts;
 

	
 
// #[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
// pub mod pseudo_socket_api;
 
#[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
pub mod pseudo_socket_api;
 

	
 
// Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages
 
#[repr(C)]
src/ffi/pseudo_socket_api.rs
Show inline comments
 
@@ -24,7 +24,7 @@ struct ConnectorComplex {
 
    // 1. connector is a upd-socket singleton
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    conencted_to: Option<SocketAddr>,
 
    connected_to: Option<SocketAddr>,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
@@ -33,6 +33,11 @@ struct CcMap {
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 
unsafe fn payload_from_raw(bytes_ptr: *const c_void, bytes_len: usize) -> Payload {
 
    let bytes_ptr = std::mem::transmute(bytes_ptr);
 
    let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
    Payload::from(bytes)
 
}
 
unsafe fn addr_from_raw(addr: *const sockaddr, addr_len: socklen_t) -> Option<SocketAddr> {
 
    os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr()
 
}
 
@@ -66,29 +71,6 @@ impl FdAllocator {
 
lazy_static::lazy_static! {
 
    static ref CC_MAP: RwLock<CcMap> = Default::default();
 
}
 
impl ConnectorComplex {
 
    unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
            connector_get(connector, *getter);
 
            match connector_sync(connector, -1) {
 
                0 => {
 
                    // batch index 0 means OK
 
                    let slice = connector.gotten(*getter).unwrap().as_slice();
 
                    let copied_bytes = slice.len().min(bytes_len);
 
                    std::ptr::copy_nonoverlapping(
 
                        slice.as_ptr(),
 
                        bytes_ptr as *mut u8,
 
                        copied_bytes,
 
                    );
 
                    copied_bytes as isize
 
                }
 
                err => return err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
@@ -97,7 +79,7 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let cc = ConnectorComplex { peer_addr: dummy_peer_addr(), connector_bound: None };
 
    let cc = ConnectorComplex { connected_to: None, connector_bound: None };
 
    w.fd_to_cc.insert(fd, RwLock::new(cc));
 
    fd
 
}
 
@@ -139,7 +121,7 @@ pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: soc
 
        );
 
        // maintain invariant: if cc.connected_to.is_some():
 
        //   cc.connected_to matches the connected address of the socket
 
        let peer_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr);
 
        let peer_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr);
 
        let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
@@ -164,7 +146,7 @@ pub unsafe extern "C" fn rw_connect(
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound {
 
        // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise)
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(peer_addr).is_err() {
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() {
 
            return CONNECT_FAILED;
 
        }
 
    }
 
@@ -186,13 +168,12 @@ pub unsafe extern "C" fn rw_send(
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if cc.connected_to.is_none() {
 
        return SEND_BEFORE_CONNECT;
 
        return SEND_BEFORE_CONNECT as isize;
 
    }
 
    if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound {
 
        // is bound
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap();
 
        connector.sync(None).unwrap();
 
        bytes_len as isize
 
    } else {
 
        // is not bound
 
@@ -211,7 +192,7 @@ pub unsafe extern "C" fn rw_sendto(
 
    // ignoring flags
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
@@ -223,14 +204,13 @@ pub unsafe extern "C" fn rw_sendto(
 
        // (temporarily) break invariant
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() {
 
            // invariant not broken. nevermind
 
            return CONNECT_FAILED;
 
            return CONNECT_FAILED as isize;
 
        }
 
        // invariant broken...
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        let old_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr)
 
        connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).unwrap();
 
        connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap();
 
        connector.sync(None).unwrap();
 
        let old_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr);
 
        connector.get_mut_udp_ee(0).unwrap().sock.connect(old_addr).unwrap();
 
        // ...invariant restored
 
        bytes_len as isize
 
    } else {
 
@@ -252,10 +232,10 @@ pub unsafe extern "C" fn rw_recv(
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound {
 
        connector.get(*getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        connector.sync(None).unwrap();
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
@@ -276,32 +256,31 @@ pub unsafe extern "C" fn rw_recvfrom(
 
    addr: *mut sockaddr,
 
    addr_len: *mut socklen_t,
 
) -> isize {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound {
 
        connector.get(*getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        connector.sync(None).unwrap();
 
        // overwrite addr and addr_len
 
        let addr = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap();
 
        let os_addr = os_socketaddr::OsSocketAddr::from(addr);
 
        let recvd_from = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap();
 
        let os_addr = os_socketaddr::OsSocketAddr::from(recvd_from);
 
        let cpy_addr_bytes = (*addr_len).min(os_addr.capacity());
 
        // ptr-return addr bytes (truncated to addr_len)
 
        std::ptr::copy_nonoverlapping(os_addr.as_ptr(), addr as *mut u8, cpy_addr_bytes);
 
        let src_u8: *const u8 = std::mem::transmute(os_addr.as_ptr());
 
        let dest_u8: *mut u8 = std::mem::transmute(addr);
 
        std::ptr::copy_nonoverlapping(src_u8, dest_u8, cpy_addr_bytes as usize);
 
        // ptr-return true addr size
 
        *addr_len = os_addr.capacity(); 
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes);
 
        let dest_u8: *mut u8 = std::mem::transmute(bytes_ptr);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), dest_u8, cpy_msg_bytes);
 
        // return number of bytes received
 
        cpy_msg_bytes as isize
 
    } else {
src/runtime/mod.rs
Show inline comments
 
@@ -42,8 +42,8 @@ pub(crate) struct SyncProtoContext<'a> {
 
}
 
#[derive(Debug)]
 
pub(crate) struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_from_this_round: Option<SocketAddr>,
 
    pub(crate) sock: UdpSocket, // already bound and connected
 
    pub(crate) received_from_this_round: Option<SocketAddr>,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
0 comments (0 inline, 0 general)