diff --git a/Cargo.toml b/Cargo.toml index 258a30287cfdafbbc1e850669b231fda1cbb8fe4..e5d19f0dd2c5e3db8d304dc979408f8f9c2fba95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi", "session_optimization"] +default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs endpoint_logging = [] # see src/macros.rs diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index c51ec250fe9324518a7d572d23b848a25683c86d..0706e9016a68b434f3c54b501f48044661ec6e9e 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,8 +3,8 @@ use core::{cell::RefCell, convert::TryFrom}; use std::os::raw::c_int; use std::slice::from_raw_parts as slice_from_raw_parts; -// #[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))] -// pub mod pseudo_socket_api; +#[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))] +pub mod pseudo_socket_api; // Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages #[repr(C)] diff --git a/src/ffi/pseudo_socket_api.rs b/src/ffi/pseudo_socket_api.rs index a4f712d4ba1a400e01df8a5f49ff59ee6d2c218e..baa8d2211306c5325c555ec329bf1527a7f840bc 100644 --- a/src/ffi/pseudo_socket_api.rs +++ b/src/ffi/pseudo_socket_api.rs @@ -24,7 +24,7 @@ struct ConnectorComplex { // 1. connector is a upd-socket singleton // 2. putter and getter are ports in the native interface with the appropriate polarities // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together. - conencted_to: Option, + connected_to: Option, connector_bound: Option, } #[derive(Default)] @@ -33,6 +33,11 @@ struct CcMap { fd_allocator: FdAllocator, } /////////////////////////////////////////////////////////////////// +unsafe fn payload_from_raw(bytes_ptr: *const c_void, bytes_len: usize) -> Payload { + let bytes_ptr = std::mem::transmute(bytes_ptr); + let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); + Payload::from(bytes) +} unsafe fn addr_from_raw(addr: *const sockaddr, addr_len: socklen_t) -> Option { os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr() } @@ -66,29 +71,6 @@ impl FdAllocator { lazy_static::lazy_static! { static ref CC_MAP: RwLock = Default::default(); } -impl ConnectorComplex { - unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize { - if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { - connector_get(connector, *getter); - match connector_sync(connector, -1) { - 0 => { - // batch index 0 means OK - let slice = connector.gotten(*getter).unwrap().as_slice(); - let copied_bytes = slice.len().min(bytes_len); - std::ptr::copy_nonoverlapping( - slice.as_ptr(), - bytes_ptr as *mut u8, - copied_bytes, - ); - copied_bytes as isize - } - err => return err as isize, - } - } else { - WRONG_STATE as isize // not bound! - } - } -} /////////////////////////////////////////////////////////////////// #[no_mangle] @@ -97,7 +79,7 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { // get writer lock let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; let fd = w.fd_allocator.alloc(); - let cc = ConnectorComplex { peer_addr: dummy_peer_addr(), connector_bound: None }; + let cc = ConnectorComplex { connected_to: None, connector_bound: None }; w.fd_to_cc.insert(fd, RwLock::new(cc)); fd } @@ -139,7 +121,7 @@ pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: soc ); // maintain invariant: if cc.connected_to.is_some(): // cc.connected_to matches the connected address of the socket - let peer_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr); + let peer_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr); let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap(); Some(ConnectorBound { connector, putter, getter }) }; @@ -164,7 +146,7 @@ pub unsafe extern "C" fn rw_connect( let cc: &mut ConnectorComplex = &mut cc; if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound { // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise) - if connector.get_mut_udp_ee(0).unwrap().sock.connect(peer_addr).is_err() { + if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() { return CONNECT_FAILED; } } @@ -186,13 +168,12 @@ pub unsafe extern "C" fn rw_send( let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; if cc.connected_to.is_none() { - return SEND_BEFORE_CONNECT; + return SEND_BEFORE_CONNECT as isize; } if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound { // is bound - let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); - connector.put(putter, Payload::from_bytes(bytes)).unwrap(); - connector.sync(connector, None).unwrap(); + connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap(); + connector.sync(None).unwrap(); bytes_len as isize } else { // is not bound @@ -211,7 +192,7 @@ pub unsafe extern "C" fn rw_sendto( // ignoring flags let addr = match addr_from_raw(addr, addr_len) { Some(addr) => addr, - _ => return BAD_SOCKADDR, + _ => return BAD_SOCKADDR as isize, }; // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; @@ -223,14 +204,13 @@ pub unsafe extern "C" fn rw_sendto( // (temporarily) break invariant if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() { // invariant not broken. nevermind - return CONNECT_FAILED; + return CONNECT_FAILED as isize; } // invariant broken... - let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); - connector.put(putter, Payload::from_bytes(bytes)).unwrap(); - connector.sync(connector, None).unwrap(); - let old_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr) - connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).unwrap(); + connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap(); + connector.sync(None).unwrap(); + let old_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr); + connector.get_mut_udp_ee(0).unwrap().sock.connect(old_addr).unwrap(); // ...invariant restored bytes_len as isize } else { @@ -252,10 +232,10 @@ pub unsafe extern "C" fn rw_recv( let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { - connector.get(getter).unwrap(); + if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound { + connector.get(*getter).unwrap(); // this call BLOCKS until it succeeds, and its got no reason to fail - connector.sync(connector, None).unwrap(); + connector.sync(None).unwrap(); // copy from gotten to caller's buffer (truncating if necessary) let slice = connector.gotten(*getter).unwrap().as_slice(); let cpy_msg_bytes = slice.len().min(bytes_len); @@ -276,32 +256,31 @@ pub unsafe extern "C" fn rw_recvfrom( addr: *mut sockaddr, addr_len: *mut socklen_t, ) -> isize { - let addr = match addr_from_raw(addr, addr_len) { - Some(addr) => addr, - _ => return BAD_SOCKADDR as isize, - }; // ignoring flags // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { - connector.get(getter).unwrap(); + if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound { + connector.get(*getter).unwrap(); // this call BLOCKS until it succeeds, and its got no reason to fail - connector.sync(connector, None).unwrap(); + connector.sync(None).unwrap(); // overwrite addr and addr_len - let addr = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap(); - let os_addr = os_socketaddr::OsSocketAddr::from(addr); + let recvd_from = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap(); + let os_addr = os_socketaddr::OsSocketAddr::from(recvd_from); let cpy_addr_bytes = (*addr_len).min(os_addr.capacity()); // ptr-return addr bytes (truncated to addr_len) - std::ptr::copy_nonoverlapping(os_addr.as_ptr(), addr as *mut u8, cpy_addr_bytes); + let src_u8: *const u8 = std::mem::transmute(os_addr.as_ptr()); + let dest_u8: *mut u8 = std::mem::transmute(addr); + std::ptr::copy_nonoverlapping(src_u8, dest_u8, cpy_addr_bytes as usize); // ptr-return true addr size *addr_len = os_addr.capacity(); // copy from gotten to caller's buffer (truncating if necessary) let slice = connector.gotten(*getter).unwrap().as_slice(); let cpy_msg_bytes = slice.len().min(bytes_len); - std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); + let dest_u8: *mut u8 = std::mem::transmute(bytes_ptr); + std::ptr::copy_nonoverlapping(slice.as_ptr(), dest_u8, cpy_msg_bytes); // return number of bytes received cpy_msg_bytes as isize } else { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 8c8d0e72082ce9c9c4e222bc469ec3f4df79954a..dfce1a1281671528fc7cb18e0fe343434ea6dffc 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -42,8 +42,8 @@ pub(crate) struct SyncProtoContext<'a> { } #[derive(Debug)] pub(crate) struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_from_this_round: Option, + pub(crate) sock: UdpSocket, // already bound and connected + pub(crate) received_from_this_round: Option, outgoing_payloads: HashMap, getter_for_incoming: PortId, }