From 700221108e9fbea00778a668384cd88d2c03d3c2 2020-07-20 09:55:39 From: Christopher Esterhuyse Date: 2020-07-20 09:55:39 Subject: [PATCH] socket api relaxed. bind and connect in any order. bind once and connect any number of times --- diff --git a/Cargo.toml b/Cargo.toml index b22eea2544e553c4a4f43ce37ffab0605f2626b7..15902eef642a795c2e76ddc22d54e0a16765765a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ getrandom = "0.1.14" # tiny crate. used to guess controller-id # network mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] } +socket2 = { version = "0.3.12", optional = true } # protocol backtrace = "0.3" @@ -39,6 +40,6 @@ crate-type = ["cdylib"] [features] default = ["ffi", "ffi_socket_api"] # // "session_optimization", ffi = [] # see src/ffi.rs -ffi_socket_api = ["ffi", "atomic_refcell"] +ffi_socket_api = ["ffi", "atomic_refcell", "socket2"] endpoint_logging = [] # see src/macros.rs session_optimization = [] # see src/runtime/setup.rs \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index 551a259d0b3e91264c1573518e11908ee0ac9c91..0acde4a19962847266480d54c46e38731c042a40 100644 --- a/src/common.rs +++ b/src/common.rs @@ -112,6 +112,10 @@ impl U32Stream { self.next += 1; self.next - 1 } + pub(crate) fn n_skipped(mut self, n: u32) -> Self { + self.next = self.next.saturating_add(n); + self + } } impl From for PortId { fn from(id: Id) -> PortId { diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index c1c11833fbae0265f25ea9ac3732636034a890d4..7cf8bcdd0815363f71d13f8acbf36928a5eceb9c 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -84,6 +84,7 @@ pub const WRONG_STATE: c_int = -2; pub const FD_LOCK_POISONED: c_int = -3; pub const CLOSE_FAIL: c_int = -4; pub const BAD_FD: c_int = -5; +pub const CONNECT_FAILED: c_int = -6; ///////////////////// REOWOLF ////////////////////////// @@ -156,7 +157,7 @@ pub unsafe extern "C" fn connector_new_logging( Ok(file) => { let connector_id = Connector::random_id(); let file_logger = Box::new(FileLogger::new(connector_id, file)); - let c = Connector::new(file_logger, pd.clone(), connector_id, 8); + let c = Connector::new(file_logger, pd.clone(), connector_id); Box::into_raw(Box::new(c)) } Err(err) => { @@ -175,7 +176,7 @@ pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) { /// The connector uses the given (internal) connector ID. #[no_mangle] pub unsafe extern "C" fn connector_new(pd: &Arc) -> *mut Connector { - let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id(), 8); + let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id()); Box::into_raw(Box::new(c)) } diff --git a/src/ffi/socket_api.rs b/src/ffi/socket_api.rs index 8e701d1c72ff1c5cc800e2dbca35c14a3150757f..2d38ed9abaaec00b28dafddffa09428c918240b2 100644 --- a/src/ffi/socket_api.rs +++ b/src/ffi/socket_api.rs @@ -1,23 +1,41 @@ use super::*; use atomic_refcell::AtomicRefCell; -use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock}; +use std::{ + collections::HashMap, + ffi::c_void, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + os::raw::c_int, + sync::RwLock, +}; /////////////////////////////////////////////////////////////////// struct FdAllocator { next: Option, freed: Vec, } -enum MaybeConnector { - New, - Bound { local_addr: SocketAddr }, - Connected { connector: Connector, putter: PortId, getter: PortId }, +struct ConnectorBound { + connector: Connector, + putter: PortId, + getter: PortId, +} +struct MaybeConnector { + // invariants: + // 1. connector is a upd-socket singleton + // 2. putter and getter are ports in the native interface with the appropriate polarities + // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together. + peer_addr: SocketAddr, + connector_bound: Option, } #[derive(Default)] -struct ConnectorStorage { - fd_to_connector: HashMap>, +struct CspStorage { + fd_to_mc: HashMap>, fd_allocator: FdAllocator, } +fn trivial_peer_addr() -> SocketAddr { + // SocketAddrV4::new isn't a constant-time func + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)) +} /////////////////////////////////////////////////////////////////// impl Default for FdAllocator { @@ -44,25 +62,26 @@ impl FdAllocator { } } lazy_static::lazy_static! { - static ref CONNECTOR_STORAGE: RwLock = Default::default(); + static ref CSP_STORAGE: RwLock = Default::default(); } /////////////////////////////////////////////////////////////////// #[no_mangle] pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { - // assuming _domain is AF_INET and _type is SOCK_DGRAM - let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED }; + // ignoring domain and type + let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED }; let fd = w.fd_allocator.alloc(); - w.fd_to_connector.insert(fd, AtomicRefCell::new(MaybeConnector::New)); + let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None }; + w.fd_to_mc.insert(fd, AtomicRefCell::new(mc)); fd } #[no_mangle] pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW - let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED }; + let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED }; w.fd_allocator.free(fd); - if w.fd_to_connector.remove(&fd).is_some() { + if w.fd_to_mc.remove(&fd).is_some() { ERR_OK } else { CLOSE_FAIL @@ -75,13 +94,22 @@ pub unsafe extern "C" fn rw_bind( local_addr: *const SocketAddr, _addr_len: usize, ) -> c_int { - use MaybeConnector as Mc; // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD }; - let mc: &mut Mc = &mut mc.borrow_mut(); - let _ = if let Mc::New = mc { () } else { return WRONG_STATE }; - *mc = Mc::Bound { local_addr: local_addr.read() }; + let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED }; + let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD }; + let mc: &mut MaybeConnector = &mut mc.borrow_mut(); + if mc.connector_bound.is_some() { + return WRONG_STATE; + } + mc.connector_bound = { + let mut connector = Connector::new( + Box::new(crate::DummyLogger), + crate::TRIVIAL_PD.clone(), + Connector::random_id(), + ); + let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap(); + Some(ConnectorBound { connector, putter, getter }) + }; ERR_OK } @@ -91,27 +119,19 @@ pub unsafe extern "C" fn rw_connect( peer_addr: *const SocketAddr, _address_len: usize, ) -> c_int { - use MaybeConnector as Mc; // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD }; - let mc: &mut Mc = &mut mc.borrow_mut(); - let local_addr = - if let Mc::Bound { local_addr } = mc { local_addr } else { return WRONG_STATE }; - let peer_addr = peer_addr.read(); - let (connector, [putter, getter]) = { - let mut c = Connector::new( - Box::new(DummyLogger), - crate::TRIVIAL_PD.clone(), - Connector::random_id(), - 8, - ); - let [putter, getter] = c.new_udp_port(*local_addr, peer_addr).unwrap(); - (c, [putter, getter]) - }; - *mc = Mc::Connected { connector, putter, getter }; + let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED }; + let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD }; + let mc: &mut MaybeConnector = &mut mc.borrow_mut(); + mc.peer_addr = peer_addr.read(); + if let Some(ConnectorBound { connector, .. }) = &mut mc.connector_bound { + if connector.get_mut_udp_sock(0).unwrap().connect(mc.peer_addr).is_err() { + return CONNECT_FAILED; + } + } ERR_OK } + #[no_mangle] pub unsafe extern "C" fn rw_send( fd: c_int, @@ -119,22 +139,18 @@ pub unsafe extern "C" fn rw_send( bytes_len: usize, _flags: c_int, ) -> isize { - use MaybeConnector as Mc; // ignoring flags - let r = - if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize }; - let mc: &mut Mc = &mut mc.borrow_mut(); - let (connector, putter) = if let Mc::Connected { connector, putter, .. } = mc { - (connector, *putter) + let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let mc: &mut MaybeConnector = &mut mc.borrow_mut(); + if let Some(ConnectorBound { connector, putter, .. }) = &mut mc.connector_bound { + match connector_put_bytes(connector, *putter, bytes_ptr as _, bytes_len) { + ERR_OK => connector_sync(connector, -1), + err => err as isize, + } } else { - return WRONG_STATE as isize; - }; - match connector_put_bytes(connector, putter, bytes_ptr as _, bytes_len) { - ERR_OK => {} - err => return err as isize, + WRONG_STATE as isize // not bound! } - connector_sync(connector, -1) } #[no_mangle] @@ -144,27 +160,23 @@ pub unsafe extern "C" fn rw_recv( bytes_len: usize, _flags: c_int, ) -> isize { - use MaybeConnector as Mc; // ignoring flags - let r = - if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize }; - let mc: &mut Mc = &mut mc.borrow_mut(); - let (connector, getter) = if let Mc::Connected { connector, getter, .. } = mc { - (connector, *getter) + let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let mc: &mut MaybeConnector = &mut mc.borrow_mut(); + if let Some(ConnectorBound { connector, getter, .. }) = &mut mc.connector_bound { + connector_get(connector, *getter); + match connector_sync(connector, -1) { + 0 => { + // batch index 0 means OK + let slice = connector.gotten(*getter).unwrap().as_slice(); + let copied_bytes = slice.len().min(bytes_len); + std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, copied_bytes); + copied_bytes as isize + } + err => return err as isize, + } } else { - return WRONG_STATE as isize; - }; - match connector_get(connector, getter) { - ERR_OK => {} - err => return err as isize, + WRONG_STATE as isize // not bound! } - match connector_sync(connector, -1) { - 0 => {} // singleton batch index - err => return err as isize, - }; - let slice = connector.gotten(getter).unwrap().as_slice(); - let copied_bytes = slice.len().min(bytes_len); - std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, copied_bytes); - copied_bytes as isize } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index dc0dc7997d97aeffbd9b22a2a53ae1290bdaa81a..a193cdf85e446a6ba111d5bc2d1e232c71f19ee6 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -113,16 +113,17 @@ impl Connector { None } } - // pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> { - // let sock = &mut self - // .get_comm_mut()? - // .endpoint_manager - // .udp_endpoint_store - // .endpoint_exts - // .get_mut(index)? - // .sock; - // Some(sock) - // } + // #[cfg(ffi_socket_api)] + pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> { + let sock = &mut self + .get_comm_mut()? + .endpoint_manager + .udp_endpoint_store + .endpoint_exts + .get_mut(index)? + .sock; + Some(sock) + } pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?; @@ -238,9 +239,6 @@ impl Connector { ); let mut ctx = NonsyncProtoContext { cu_inner: &mut cu.inner, - // logger: &mut *cu.inner.logger, - // port_info: &mut cu.inner.port_info, - // id_manager: &mut cu.inner.id_manager, proto_component_id, unrun_components: &mut unrun_components, proto_component_ports: &mut cu @@ -419,7 +417,6 @@ impl Connector { } }; log!(cu.inner.logger, "Sync round ending! Cleaning up"); - // dropping {solution_storage, payloads_to_get} ret } @@ -892,6 +889,7 @@ impl BranchingProtoComponent { ); use SyncBlocker as B; match blocker { + B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency B::NondetChoice { n } => { let var = rctx.spec_var_stream.next(); for val in SpecVal::iter_domain().take(n as usize) { @@ -901,35 +899,6 @@ impl BranchingProtoComponent { drainer.add_input(pred, branch_n); } } - B::Inconsistent => { - // EXPLICIT inconsistency - drop((predicate, branch)); - } - B::SyncBlockEnd => { - // make concrete all variables - for port in ports.iter() { - let var = cu.inner.port_info.spec_var_for(*port); - let should_have_fired = branch.inner.did_put_or_get.contains(port); - let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); - let did_fire = val == SpecVal::FIRING; - if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); - // IMPLICIT inconsistency - drop((predicate, branch)); - return Ok(()); - } - } - // submit solution for this component - let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); - rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.inner.logger, - subtree_id, - predicate.clone(), - ); - branch.ended = true; - // move to "blocked" - drainer.add_output(predicate, branch); - } B::CouldntReadMsg(port) => { // move to "blocked" assert!(!branch.inner.inbox.contains_key(&port)); @@ -962,6 +931,31 @@ impl BranchingProtoComponent { drainer.add_input(predicate, branch); } } + B::SyncBlockEnd => { + // make concrete all variables + for port in ports.iter() { + let var = cu.inner.port_info.spec_var_for(*port); + let should_have_fired = branch.inner.did_put_or_get.contains(port); + let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + let did_fire = val == SpecVal::FIRING; + if did_fire != should_have_fired { + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + // IMPLICIT inconsistency + drop((predicate, branch)); + return Ok(()); + } + } + // submit solution for this component + let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); + rctx.solution_storage.submit_and_digest_subtree_solution( + &mut *cu.inner.logger, + subtree_id, + predicate.clone(), + ); + branch.ended = true; + // move to "blocked" + drainer.add_output(predicate, branch); + } } Ok(()) }) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index fab028f96ff54f15b1f9b8087495042b29d0c01d..a092d43973bab8c7e86b93ef52bfcb0ac67c3954 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -30,24 +30,22 @@ pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); pub(crate) struct NonsyncProtoContext<'a> { - cu_inner: &'a mut ConnectorUnphasedInner, - proto_component_ports: &'a mut HashSet, - unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, - proto_component_id: ProtoComponentId, + cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds + proto_component_ports: &'a mut HashSet, // sub-structure of component + unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, // lives for Nonsync phase + proto_component_id: ProtoComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { - cu_inner: &'a mut ConnectorUnphasedInner, - branch_inner: &'a mut ProtoComponentBranchInner, - predicate: &'a Predicate, + cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds + branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch + predicate: &'a Predicate, // KEY in pred->branch map } - #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { untaken_choice: Option, did_put_or_get: HashSet, inbox: HashMap, } - #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] @@ -247,7 +245,6 @@ struct ConnectorUnphasedInner { struct ConnectorSetup { net_endpoint_setups: Vec, udp_endpoint_setups: Vec, - surplus_sockets: u16, } #[derive(Debug)] enum ConnectorPhased { @@ -358,11 +355,12 @@ impl IdManager { } } fn new_spec_var_stream(&self) -> SpecVarStream { - let mut port_suffix_stream = self.port_suffix_stream.clone(); - const JUMP_OVER: usize = 100; // Jumping is entirely unnecessary. It's only used to make spec vars easier to spot in logs - for _ in 0..JUMP_OVER { - port_suffix_stream.next(); // throw away an ID - } + // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. + // This gap is entirely unnecessary (i.e. 0 is fine) + // It's purpose is only to make SpecVars easier to spot in logs. + // E.g. spot the spec var: { v0_0, v1_2, v1_103 } + const SKIP_N: u32 = 100; + let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); SpecVarStream { connector_id: self.connector_id, port_suffix_stream } } fn new_port_id(&mut self) -> PortId { @@ -490,7 +488,6 @@ impl Predicate { /// Given self and other, two predicates, return the predicate whose /// assignments are the union of those of self and other. - /// fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { use AssignmentUnionResult as Aur; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 95db7fb554a1569e2b5077733c8639824d15cd06..60d172653166884cfa7cc2d84faf8f04e5a608c6 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -6,7 +6,6 @@ impl Connector { mut logger: Box, proto_description: Arc, connector_id: ConnectorId, - surplus_sockets: u16, ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); Self { @@ -23,7 +22,6 @@ impl Connector { phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { net_endpoint_setups: Default::default(), udp_endpoint_setups: Default::default(), - surplus_sockets, })), } } diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index b022b14f61611422cf747106ca34078f358c0b51..7d4cc813d1d4e991e3c6915a44f8b95e7f46b100 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -34,7 +34,7 @@ fn file_logged_configured_connector( let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); let file = File::create(path).unwrap(); let file_logger = Box::new(FileLogger::new(connector_id, file)); - Connector::new(file_logger, pd, connector_id, 8) + Connector::new(file_logger, pd, connector_id) } static MINIMAL_PDL: &'static [u8] = b" primitive together(in ia, in ib, out oa, out ob){ @@ -67,7 +67,7 @@ fn new_u8_buffer(cap: usize) -> Vec { #[test] fn basic_connector() { - Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0); + Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0); } #[test]