diff --git a/Cargo.toml b/Cargo.toml index e5d19f0dd2c5e3db8d304dc979408f8f9c2fba95..258a30287cfdafbbc1e850669b231fda1cbb8fe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"] +default = ["ffi", "session_optimization"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs endpoint_logging = [] # see src/macros.rs diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index a75c2cf7b8a1c74542532e11c553a9f0e7f47b85..c51ec250fe9324518a7d572d23b848a25683c86d 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,8 +3,8 @@ use core::{cell::RefCell, convert::TryFrom}; use std::os::raw::c_int; use std::slice::from_raw_parts as slice_from_raw_parts; -#[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))] -pub mod pseudo_socket_api; +// #[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))] +// pub mod pseudo_socket_api; // Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages #[repr(C)] @@ -85,6 +85,7 @@ pub const BAD_FD: c_int = -5; pub const CONNECT_FAILED: c_int = -6; pub const WOULD_BLOCK: c_int = -7; pub const BAD_SOCKADDR: c_int = -8; +pub const SEND_BEFORE_CONNECT: c_int = -9; ///////////////////// REOWOLF ////////////////////////// diff --git a/src/ffi/pseudo_socket_api.rs b/src/ffi/pseudo_socket_api.rs index 1bc8acba6952deec136ad859f34d3fb96d4e67fa..a4f712d4ba1a400e01df8a5f49ff59ee6d2c218e 100644 --- a/src/ffi/pseudo_socket_api.rs +++ b/src/ffi/pseudo_socket_api.rs @@ -1,5 +1,6 @@ use super::*; +use libc::{sockaddr, socklen_t}; use std::{ collections::HashMap, ffi::c_void, @@ -7,7 +8,6 @@ use std::{ os::raw::c_int, sync::RwLock, }; -use libc::{sockaddr, socklen_t}; /////////////////////////////////////////////////////////////////// struct FdAllocator { @@ -23,8 +23,8 @@ struct ConnectorComplex { // invariants: // 1. connector is a upd-socket singleton // 2. putter and getter are ports in the native interface with the appropriate polarities - // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together. - peer_addr: SocketAddr, + // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together. + conencted_to: Option, connector_bound: Option, } #[derive(Default)] @@ -36,9 +36,9 @@ struct CcMap { unsafe fn addr_from_raw(addr: *const sockaddr, addr_len: socklen_t) -> Option { os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr() } -fn trivial_peer_addr() -> SocketAddr { +fn dummy_peer_addr() -> SocketAddr { // SocketAddrV4::new isn't a constant-time func - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)) + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 0), 8000)) } impl Default for FdAllocator { fn default() -> Self { @@ -67,25 +67,6 @@ lazy_static::lazy_static! { static ref CC_MAP: RwLock = Default::default(); } impl ConnectorComplex { - fn connect(&mut self, peer_addr: SocketAddr) -> c_int { - self.peer_addr = peer_addr; - if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound { - if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() { - return CONNECT_FAILED; - } - } - ERR_OK - } - unsafe fn send(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize { - if let Some(ConnectorBound { connector, putter, .. }) = &mut self.connector_bound { - match connector_put_bytes(connector, *putter, bytes_ptr as _, bytes_len) { - ERR_OK => connector_sync(connector, -1), - err => err as isize, - } - } else { - WRONG_STATE as isize // not bound! - } - } unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize { if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { connector_get(connector, *getter); @@ -108,15 +89,15 @@ impl ConnectorComplex { } } } - /////////////////////////////////////////////////////////////////// #[no_mangle] pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { // ignoring domain and type + // get writer lock let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; let fd = w.fd_allocator.alloc(); - let cc = ConnectorComplex { peer_addr: trivial_peer_addr(), connector_bound: None }; + let cc = ConnectorComplex { peer_addr: dummy_peer_addr(), connector_bound: None }; w.fd_to_cc.insert(fd, RwLock::new(cc)); fd } @@ -124,6 +105,7 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { #[no_mangle] pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW + // get writer lock let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; if w.fd_to_cc.remove(&fd).is_some() { w.fd_allocator.free(fd); @@ -135,16 +117,18 @@ pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { #[no_mangle] pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int { + // assuming _domain is AF_INET and _type is SOCK_DGRAM let addr = match addr_from_raw(addr, addr_len) { Some(addr) => addr, _ => return BAD_SOCKADDR, }; - // assuming _domain is AF_INET and _type is SOCK_DGRAM + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; let cc: &mut ConnectorComplex = &mut cc; if cc.connector_bound.is_some() { + // already bound! return WRONG_STATE; } cc.connector_bound = { @@ -153,7 +137,10 @@ pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: soc crate::TRIVIAL_PD.clone(), Connector::random_id(), ); - let [putter, getter] = connector.new_udp_mediator_component(addr, cc.peer_addr).unwrap(); + // maintain invariant: if cc.connected_to.is_some(): + // cc.connected_to matches the connected address of the socket + let peer_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr); + let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap(); Some(ConnectorBound { connector, putter, getter }) }; ERR_OK @@ -170,11 +157,19 @@ pub unsafe extern "C" fn rw_connect( _ => return BAD_SOCKADDR, }; // assuming _domain is AF_INET and _type is SOCK_DGRAM + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; let cc: &mut ConnectorComplex = &mut cc; - cc.connect(addr) + if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound { + // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise) + if connector.get_mut_udp_ee(0).unwrap().sock.connect(peer_addr).is_err() { + return CONNECT_FAILED; + } + } + cc.connected_to = Some(addr); + ERR_OK } #[no_mangle] @@ -185,60 +180,91 @@ pub unsafe extern "C" fn rw_send( _flags: c_int, ) -> isize { // ignoring flags + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - cc.send(bytes_ptr, bytes_len) + if cc.connected_to.is_none() { + return SEND_BEFORE_CONNECT; + } + if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound { + // is bound + let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); + connector.put(putter, Payload::from_bytes(bytes)).unwrap(); + connector.sync(connector, None).unwrap(); + bytes_len as isize + } else { + // is not bound + WRONG_STATE as isize + } } - #[no_mangle] -pub unsafe extern "C" fn rw_recv( +pub unsafe extern "C" fn rw_sendto( fd: c_int, bytes_ptr: *mut c_void, bytes_len: usize, _flags: c_int, + addr: *const sockaddr, + addr_len: socklen_t, ) -> isize { // ignoring flags + let addr = match addr_from_raw(addr, addr_len) { + Some(addr) => addr, + _ => return BAD_SOCKADDR, + }; + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - cc.recv(bytes_ptr, bytes_len) + if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound { + // is bound + // (temporarily) break invariant + if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() { + // invariant not broken. nevermind + return CONNECT_FAILED; + } + // invariant broken... + let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len); + connector.put(putter, Payload::from_bytes(bytes)).unwrap(); + connector.sync(connector, None).unwrap(); + let old_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr) + connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).unwrap(); + // ...invariant restored + bytes_len as isize + } else { + // is not bound + WRONG_STATE as isize + } } #[no_mangle] -pub unsafe extern "C" fn rw_sendto( +pub unsafe extern "C" fn rw_recv( fd: c_int, bytes_ptr: *mut c_void, bytes_len: usize, _flags: c_int, - addr: *const sockaddr, - addr_len: socklen_t, ) -> isize { - let addr = match addr_from_raw(addr, addr_len) { - Some(addr) => addr, - _ => return BAD_SOCKADDR as isize, - }; + // ignoring flags + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - // copy currently old_addr - let old_addr = cc.peer_addr; - // connect to given peer_addr - match cc.connect(addr) { - e if e != ERR_OK => return e as isize, - _ => {} - } - // send - let ret = cc.send(bytes_ptr, bytes_len); - // restore old_addr - match cc.connect(old_addr) { - e if e != ERR_OK => return e as isize, - _ => {} + if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { + connector.get(getter).unwrap(); + // this call BLOCKS until it succeeds, and its got no reason to fail + connector.sync(connector, None).unwrap(); + // copy from gotten to caller's buffer (truncating if necessary) + let slice = connector.gotten(*getter).unwrap().as_slice(); + let cpy_msg_bytes = slice.len().min(bytes_len); + std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); + // return number of bytes sent + cpy_msg_bytes as isize + } else { + WRONG_STATE as isize // not bound! } - ret } #[no_mangle] @@ -247,30 +273,38 @@ pub unsafe extern "C" fn rw_recvfrom( bytes_ptr: *mut c_void, bytes_len: usize, _flags: c_int, - addr: *const sockaddr, - addr_len: socklen_t, + addr: *mut sockaddr, + addr_len: *mut socklen_t, ) -> isize { let addr = match addr_from_raw(addr, addr_len) { Some(addr) => addr, _ => return BAD_SOCKADDR as isize, }; + // ignoring flags + // get outer reader, inner writer locks let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; let cc: &mut ConnectorComplex = &mut cc; - // copy currently old_addr - let old_addr = cc.peer_addr; - // connect to given peer_addr - match cc.connect(addr) { - e if e != ERR_OK => return e as isize, - _ => {} - } - // send - let ret = cc.send(bytes_ptr, bytes_len); - // restore old_addr - match cc.connect(old_addr) { - e if e != ERR_OK => return e as isize, - _ => {} + if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound { + connector.get(getter).unwrap(); + // this call BLOCKS until it succeeds, and its got no reason to fail + connector.sync(connector, None).unwrap(); + // overwrite addr and addr_len + let addr = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap(); + let os_addr = os_socketaddr::OsSocketAddr::from(addr); + let cpy_addr_bytes = (*addr_len).min(os_addr.capacity()); + // ptr-return addr bytes (truncated to addr_len) + std::ptr::copy_nonoverlapping(os_addr.as_ptr(), addr as *mut u8, cpy_addr_bytes); + // ptr-return true addr size + *addr_len = os_addr.capacity(); + // copy from gotten to caller's buffer (truncating if necessary) + let slice = connector.gotten(*getter).unwrap().as_slice(); + let cpy_msg_bytes = slice.len().min(bytes_len); + std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); + // return number of bytes received + cpy_msg_bytes as isize + } else { + WRONG_STATE as isize // not bound! } - ret } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 83e5ede6e7378bc849393b85fe82ca92a76cf8cf..4b55cea8d8da2020cd3d36719bcfc5694ae9df0a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -114,15 +114,8 @@ impl Connector { } } // #[cfg(ffi_socket_api)] - pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> { - let sock = &mut self - .get_comm_mut()? - .endpoint_manager - .udp_endpoint_store - .endpoint_exts - .get_mut(index)? - .sock; - Some(sock) + pub(crate) fn get_mut_udp_ee(&mut self, index: usize) -> Option<&mut UdpEndpointExt> { + self.get_comm_mut()?.endpoint_manager.udp_endpoint_store.endpoint_exts.get_mut(index) } pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index f3149c2a91f75540634698df841a26d35172af13..52f2cd4dfedffb4c27ab6c6bc5a12e3c63d0065c 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -234,10 +234,10 @@ impl EndpointManager { let recv_buffer = self.udp_in_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; - if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { + if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); - if !ee.received_this_round { + if !ee.received_from_this_round.is_none() { let payload = Payload::from(&recv_buffer[..bytes_written]); let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); @@ -246,7 +246,7 @@ impl EndpointManager { SendPayloadMsg { payload, predicate }, ); some_message_enqueued = true; - ee.received_this_round = true; + ee.received_from_this_round = Some(from); } else { // lose the message! } @@ -343,7 +343,7 @@ impl EndpointManager { self.udp_endpoint_store.endpoint_exts.len() ); for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() { - ee.received_this_round = false; + ee.received_from_this_round = None; } } pub(super) fn udp_endpoints_round_end( @@ -351,6 +351,7 @@ impl EndpointManager { logger: &mut dyn Logger, decision: &Decision, ) -> Result<(), UnrecoverableSyncError> { + // retain received_from_this_round for use in pseudo_socket_api::recv_from log!( logger, "Ending round for {} udp endpoints", diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 70d2abeb9c473ae97ea2adc2291709d10f620a06..8c8d0e72082ce9c9c4e222bc469ec3f4df79954a 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -40,6 +40,13 @@ pub(crate) struct SyncProtoContext<'a> { branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch predicate: &'a Predicate, // KEY in pred->branch map } +#[derive(Debug)] +pub(crate) struct UdpEndpointExt { + sock: UdpSocket, // already bound and connected + received_from_this_round: Option, + outgoing_payloads: HashMap, + getter_for_incoming: PortId, +} #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { untaken_choice: Option, @@ -206,13 +213,6 @@ struct EndpointStore { endpoint_exts: Vec, polled_undrained: VecSet, } -#[derive(Debug)] -struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_this_round: bool, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { polarities: HashMap, diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 476bdb1ca51f259bf91cbcf905ff6edcc70109d4..7a3c0d01ac9ff9b16d0c66b5fb562805ad00cc44 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -511,7 +511,7 @@ fn new_endpoint_manager( UdpEndpointExt { sock, outgoing_payloads: Default::default(), - received_this_round: false, + received_from_this_round: None, getter_for_incoming, } })