From 1f3007fd929ed687423256eb57b75cf7b02331b5 2020-07-23 15:15:40 From: Christopher Esterhuyse Date: 2020-07-23 15:15:40 Subject: [PATCH] restricted pseudo-socket API to connection-oriented API --- diff --git a/Cargo.toml b/Cargo.toml index e5d19f0dd2c5e3db8d304dc979408f8f9c2fba95..258a30287cfdafbbc1e850669b231fda1cbb8fe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"] +default = ["ffi", "session_optimization"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs endpoint_logging = [] # see src/macros.rs diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 0706e9016a68b434f3c54b501f48044661ec6e9e..7f1b00fc9b15e9adb9ce3726cecc07290bb600f6 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -79,7 +79,7 @@ thread_local! { pub const ERR_OK: c_int = 0; pub const ERR_REOWOLF: c_int = -1; pub const WRONG_STATE: c_int = -2; -pub const CC_MAP_LOCK_POISONED: c_int = -3; +pub const LOCK_POISONED: c_int = -3; pub const CLOSE_FAIL: c_int = -4; pub const BAD_FD: c_int = -5; pub const CONNECT_FAILED: c_int = -6; diff --git a/src/ffi/pseudo_socket_api.rs b/src/ffi/pseudo_socket_api.rs index baa8d2211306c5325c555ec329bf1527a7f840bc..53dc95524486572a07b06ba8e78486ebbab289b6 100644 --- a/src/ffi/pseudo_socket_api.rs +++ b/src/ffi/pseudo_socket_api.rs @@ -14,18 +14,16 @@ struct FdAllocator { next: Option, freed: Vec, } -struct ConnectorBound { - connector: Connector, - putter: PortId, - getter: PortId, -} -struct ConnectorComplex { - // invariants: - // 1. connector is a upd-socket singleton - // 2. putter and getter are ports in the native interface with the appropriate polarities - // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together. - connected_to: Option, - connector_bound: Option, +enum ConnectorComplex { + Setup { + local: Option, + peer: Option, + }, + Communication { + connector: Connector, + putter: PortId, + getter: PortId, + }, } #[derive(Default)] struct CcMap { @@ -69,26 +67,39 @@ impl FdAllocator { } } lazy_static::lazy_static! { - static ref CC_MAP: RwLock = Default::default(); + static ref LOCK_POISONED: RwLock = Default::default(); +} +impl ConnectorComplex { + fn try_become_connected(&mut self) { + match self { + ConnectorComplex::Setup { Some(local), Some(peer) } => { + // setup complete + let connector = Connector::new(crate::DummyLogger, TRIVIAL_PD.clone(), Connector::random_id()); + let [putter, getter] = connector.new_udp_mediator_component(local, peer).unwrap(); + *self = ConnectorComplex::Communication { connector, putter, getter } + } + _ => {} // setup incomplete + } + } } -/////////////////////////////////////////////////////////////////// - +fn connected_complex(local: SocketAddr, peer: SocketAddr) -> ConnectorComplex { +} +///////////////////////////////// #[no_mangle] pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { // ignoring domain and type // get writer lock - let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; + let mut w = if let Ok(w) = LOCK_POISONED.write() { w } else { return CCMLP }; let fd = w.fd_allocator.alloc(); - let cc = ConnectorComplex { connected_to: None, connector_bound: None }; + let cc = ConnectorComplex::Setup { local: None, peer: None }; w.fd_to_cc.insert(fd, RwLock::new(cc)); fd } - #[no_mangle] pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW // get writer lock - let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED }; + let mut w = if let Ok(w) = LOCK_POISONED.write() { w } else { return CCMLP }; if w.fd_to_cc.remove(&fd).is_some() { w.fd_allocator.free(fd); ERR_OK @@ -96,7 +107,6 @@ pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { CLOSE_FAIL } } - #[no_mangle] pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int { // assuming _domain is AF_INET and _type is SOCK_DGRAM @@ -105,29 +115,20 @@ pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: soc _ => return BAD_SOCKADDR, }; // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; + let r = if let Ok(r) = LOCK_POISONED.read() { r } else { return CCMLP }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CCMLP }; let cc: &mut ConnectorComplex = &mut cc; - if cc.connector_bound.is_some() { - // already bound! - return WRONG_STATE; + match cc { + ConnectorComplex::Communication { ..} => WRONG_STATE, + ConnectorComplex::Setup { local, peer } => { + ConnectorComplex::Setup { local, .. } => { + *local = Some(addr); + cc.try_become_connected(); + ERR_OK + } } - cc.connector_bound = { - let mut connector = Connector::new( - Box::new(crate::DummyLogger), - crate::TRIVIAL_PD.clone(), - Connector::random_id(), - ); - // maintain invariant: if cc.connected_to.is_some(): - // cc.connected_to matches the connected address of the socket - let peer_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr); - let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap(); - Some(ConnectorBound { connector, putter, getter }) - }; - ERR_OK } - #[no_mangle] pub unsafe extern "C" fn rw_connect( fd: c_int, @@ -140,20 +141,19 @@ pub unsafe extern "C" fn rw_connect( }; // assuming _domain is AF_INET and _type is SOCK_DGRAM // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED }; + let r = if let Ok(r) = LOCK_POISONED.read() { r } else { return CCMLP }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CCMLP }; let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound { - // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise) - if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() { - return CONNECT_FAILED; + match cc { + ConnectorComplex::Communication { .. } => WRONG_STATE, + ConnectorComplex::Setup { peer, .. } => { + *peer = Some(addr); + cc.try_become_connected(); + ERR_OK } } - cc.connected_to = Some(addr); - ERR_OK } - #[no_mangle] pub unsafe extern "C" fn rw_send( fd: c_int, @@ -163,62 +163,20 @@ pub unsafe extern "C" fn rw_send( ) -> isize { // ignoring flags // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let r = if let Ok(r) = LOCK_POISONED.read() { r } else { return CCMLP as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CCMLP as isize }; let cc: &mut ConnectorComplex = &mut cc; - if cc.connected_to.is_none() { - return SEND_BEFORE_CONNECT as isize; - } - if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound { - // is bound - connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap(); - connector.sync(None).unwrap(); - bytes_len as isize - } else { - // is not bound - WRONG_STATE as isize - } -} -#[no_mangle] -pub unsafe extern "C" fn rw_sendto( - fd: c_int, - bytes_ptr: *mut c_void, - bytes_len: usize, - _flags: c_int, - addr: *const sockaddr, - addr_len: socklen_t, -) -> isize { - // ignoring flags - let addr = match addr_from_raw(addr, addr_len) { - Some(addr) => addr, - _ => return BAD_SOCKADDR as isize, - }; - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; - let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound { - // is bound - // (temporarily) break invariant - if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() { - // invariant not broken. nevermind - return CONNECT_FAILED as isize; + match cc { + ConnectorComplex::Setup { .. } => WRONG_STATE as isize, + ConnectorComplex::Communication { connector, putter, .. } => { + let payload = payload_from_raw(bytes_ptr, bytes_len); + connector.put(*putter, payload).unwrap(); + connector.sync(None).unwrap(); + bytes_len } - // invariant broken... - connector.put(*putter, payload_from_raw(bytes_ptr, bytes_len)).unwrap(); - connector.sync(None).unwrap(); - let old_addr = cc.connected_to.unwrap_or_else(dummy_peer_addr); - connector.get_mut_udp_ee(0).unwrap().sock.connect(old_addr).unwrap(); - // ...invariant restored - bytes_len as isize - } else { - // is not bound - WRONG_STATE as isize } } - #[no_mangle] pub unsafe extern "C" fn rw_recv( fd: c_int, @@ -228,62 +186,19 @@ pub unsafe extern "C" fn rw_recv( ) -> isize { // ignoring flags // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; - let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; - let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound { - connector.get(*getter).unwrap(); - // this call BLOCKS until it succeeds, and its got no reason to fail - connector.sync(None).unwrap(); - // copy from gotten to caller's buffer (truncating if necessary) - let slice = connector.gotten(*getter).unwrap().as_slice(); - let cpy_msg_bytes = slice.len().min(bytes_len); - std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); - // return number of bytes sent - cpy_msg_bytes as isize - } else { - WRONG_STATE as isize // not bound! - } -} - -#[no_mangle] -pub unsafe extern "C" fn rw_recvfrom( - fd: c_int, - bytes_ptr: *mut c_void, - bytes_len: usize, - _flags: c_int, - addr: *mut sockaddr, - addr_len: *mut socklen_t, -) -> isize { - // ignoring flags - // get outer reader, inner writer locks - let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize }; + let r = if let Ok(r) = LOCK_POISONED.read() { r } else { return CCMLP as isize }; let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize }; - let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize }; + let mut cc = if let Ok(cc) = cc.write() { cc } else { return CCMLP as isize }; let cc: &mut ConnectorComplex = &mut cc; - if let Some(ConnectorBound { connector, getter, .. }) = &mut cc.connector_bound { - connector.get(*getter).unwrap(); - // this call BLOCKS until it succeeds, and its got no reason to fail - connector.sync(None).unwrap(); - // overwrite addr and addr_len - let recvd_from = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap(); - let os_addr = os_socketaddr::OsSocketAddr::from(recvd_from); - let cpy_addr_bytes = (*addr_len).min(os_addr.capacity()); - // ptr-return addr bytes (truncated to addr_len) - let src_u8: *const u8 = std::mem::transmute(os_addr.as_ptr()); - let dest_u8: *mut u8 = std::mem::transmute(addr); - std::ptr::copy_nonoverlapping(src_u8, dest_u8, cpy_addr_bytes as usize); - // ptr-return true addr size - *addr_len = os_addr.capacity(); - // copy from gotten to caller's buffer (truncating if necessary) - let slice = connector.gotten(*getter).unwrap().as_slice(); - let cpy_msg_bytes = slice.len().min(bytes_len); - let dest_u8: *mut u8 = std::mem::transmute(bytes_ptr); - std::ptr::copy_nonoverlapping(slice.as_ptr(), dest_u8, cpy_msg_bytes); - // return number of bytes received - cpy_msg_bytes as isize - } else { - WRONG_STATE as isize // not bound! + match cc { + ConnectorComplex::Setup { .. } => WRONG_STATE as isize, + ConnectorComplex::Communication { connector, getter, .. } => { + connector.get(*getter).unwrap(); + connector.sync(None).unwrap(); + let slice = connector.gotten(*getter).unwrap().as_slice(); + let cpy_msg_bytes = slice.len().min(bytes_len); + std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes); + cpy_msg_bytes + } } } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 4b55cea8d8da2020cd3d36719bcfc5694ae9df0a..3c966b92235b970a444b9a00114fc5f064d11435 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -113,10 +113,6 @@ impl Connector { None } } - // #[cfg(ffi_socket_api)] - pub(crate) fn get_mut_udp_ee(&mut self, index: usize) -> Option<&mut UdpEndpointExt> { - self.get_comm_mut()?.endpoint_manager.udp_endpoint_store.endpoint_exts.get_mut(index) - } pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index dfce1a1281671528fc7cb18e0fe343434ea6dffc..b3aa732d20099ab97b6e7651c4e4ebbaecd66eb7 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -40,13 +40,6 @@ pub(crate) struct SyncProtoContext<'a> { branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch predicate: &'a Predicate, // KEY in pred->branch map } -#[derive(Debug)] -pub(crate) struct UdpEndpointExt { - pub(crate) sock: UdpSocket, // already bound and connected - pub(crate) received_from_this_round: Option, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { untaken_choice: Option, @@ -176,6 +169,13 @@ struct NetEndpointExt { getter_for_incoming: PortId, } #[derive(Debug)] +struct UdpEndpointExt { + sock: UdpSocket, // already bound and connected + received_from_this_round: Option, + outgoing_payloads: HashMap, + getter_for_incoming: PortId, +} +#[derive(Debug)] struct Neighborhood { parent: Option, children: VecSet,