diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 7cf8bcdd0815363f71d13f8acbf36928a5eceb9c..e2537e70ec313febddeab3fcf730add026cfc155 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -272,7 +272,7 @@ pub unsafe extern "C" fn connector_add_net_port( /// - where P is a Putter port that sends messages into the socket /// - where G is a Getter port that recvs messages from the socket #[no_mangle] -pub unsafe extern "C" fn connector_add_udp_port( +pub unsafe extern "C" fn connector_add_udp_port_pair( connector: &mut Connector, putter: *mut PortId, getter: *mut PortId, @@ -290,7 +290,7 @@ pub unsafe extern "C" fn connector_add_udp_port( Ok(local) => local, Err(errcode) => return errcode, }; - match connector.new_udp_port(local, peer) { + match connector.new_udp_mediator_component(local, peer) { Ok([p, g]) => { if !putter.is_null() { putter.write(p); diff --git a/src/ffi/socket_api.rs b/src/ffi/socket_api.rs index b297ea144d5e0792c805aeabf998d55cb9d78bd9..6d56d8e4dfd74dba4de74501422e561f8923d22d 100644 --- a/src/ffi/socket_api.rs +++ b/src/ffi/socket_api.rs @@ -27,8 +27,8 @@ struct MaybeConnector { connector_bound: Option, } #[derive(Default)] -struct CspStorage { - fd_to_mc: HashMap>, +struct FdcStorage { + fd_to_c: HashMap>, fd_allocator: FdAllocator, } fn trivial_peer_addr() -> SocketAddr { @@ -61,7 +61,7 @@ impl FdAllocator { } } lazy_static::lazy_static! { - static ref CSP_STORAGE: RwLock = Default::default(); + static ref FDC_STORAGE: RwLock = Default::default(); } impl MaybeConnector { fn connect(&mut self, peer_addr: SocketAddr) -> c_int { @@ -110,19 +110,19 @@ impl MaybeConnector { #[no_mangle] pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { // ignoring domain and type - let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED }; + let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED }; let fd = w.fd_allocator.alloc(); let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None }; - w.fd_to_mc.insert(fd, RwLock::new(mc)); + w.fd_to_c.insert(fd, RwLock::new(mc)); fd } #[no_mangle] pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW - let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED }; - w.fd_allocator.free(fd); - if w.fd_to_mc.remove(&fd).is_some() { + let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED }; + if w.fd_to_c.remove(&fd).is_some() { + w.fd_allocator.free(fd); ERR_OK } else { CLOSE_FAIL @@ -136,8 +136,8 @@ pub unsafe extern "C" fn rw_bind( _addr_len: usize, ) -> c_int { // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED }; let mc: &mut MaybeConnector = &mut mc; if mc.connector_bound.is_some() { @@ -149,7 +149,8 @@ pub unsafe extern "C" fn rw_bind( crate::TRIVIAL_PD.clone(), Connector::random_id(), ); - let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap(); + let [putter, getter] = + connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap(); Some(ConnectorBound { connector, putter, getter }) }; ERR_OK @@ -162,8 +163,8 @@ pub unsafe extern "C" fn rw_connect( _address_len: usize, ) -> c_int { // assuming _domain is AF_INET and _type is SOCK_DGRAM - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED }; let mc: &mut MaybeConnector = &mut mc; mc.connect(peer_addr.read()) @@ -177,8 +178,8 @@ pub unsafe extern "C" fn rw_send( _flags: c_int, ) -> isize { // ignoring flags - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; let mc: &mut MaybeConnector = &mut mc; mc.send(bytes_ptr, bytes_len) @@ -192,8 +193,8 @@ pub unsafe extern "C" fn rw_recv( _flags: c_int, ) -> isize { // ignoring flags - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; let mc: &mut MaybeConnector = &mut mc; mc.recv(bytes_ptr, bytes_len) @@ -208,8 +209,8 @@ pub unsafe extern "C" fn rw_sendto( peer_addr: *const SocketAddr, _addr_len: usize, ) -> isize { - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; let mc: &mut MaybeConnector = &mut mc; // copy currently connected peer addr @@ -239,8 +240,8 @@ pub unsafe extern "C" fn rw_recvfrom( peer_addr: *const SocketAddr, _addr_len: usize, ) -> isize { - let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; - let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize }; + let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize }; let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize }; let mc: &mut MaybeConnector = &mut mc; // copy currently connected peer addr diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index a193cdf85e446a6ba111d5bc2d1e232c71f19ee6..9518a30ca39ec63f75ed7a56d1d183901df05b41 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -23,7 +23,7 @@ struct BranchingNative { } #[derive(Clone, Debug)] struct NativeBranch { - index: usize, + batch_index: usize, gotten: HashMap, to_get: HashSet, } @@ -303,10 +303,9 @@ impl Connector { "Translating {} native batches into branches...", comm.native_batches.len() ); - let native_branch_spec_var = rctx.spec_var_stream.next(); - log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var); + let mut native_spec_assign_stream = None; let mut branching_native = BranchingNative { branches: Default::default() }; - 'native_branches: for ((native_branch, index), branch_spec_val) in + 'native_branches: for ((native_branch, batch_index), branch_spec_val) in comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) { let NativeBatch { to_get, to_put } = native_branch; @@ -323,31 +322,58 @@ impl Connector { for &port in cu.inner.native_ports.difference(&firing_ports) { let var = cu.inner.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", batch_index, var); continue 'native_branches; } } - // this branch is consistent. distinguish it with a unique var:val mapping and proceed - predicate.inserted(native_branch_spec_var, branch_spec_val) + predicate }; log!( cu.inner.logger, - "Native branch index={:?} has consistent {:?}", - index, + "Native batch_index={:?} has consistent {:?}", + batch_index, &predicate ); // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); + log!(cu.inner.logger, "Native branch {} sending msg {:?}", batch_index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; + let branch = NativeBranch { batch_index, gotten: Default::default(), to_get }; + if let Some(old_branch) = branching_native.branches.remove(&predicate) { + let (var, vals): (SpecVar, [SpecVal; 2]) = { + let (var, val_iter) = native_spec_assign_stream.get_or_insert_with(|| { + (rctx.spec_var_stream.next(), SpecVal::iter_domain()) + }); + let vals = [ + val_iter.next().expect("Exhausted specval space!"), + val_iter.next().expect("Exhausted specval space!"), + ]; + (*var, vals) + }; + log!( + cu.inner.logger, + "Branch collision on {:?}. Going to distinguish them with a spec var {:?} ands vals {:?}", + &predicate, + var, + vals + ); + branching_native + .branches + .insert(predicate.clone().inserted(var, vals[0]), old_branch); + branching_native.branches.insert(predicate.inserted(var, vals[1]), branch); + } else { + // no branch collision + branching_native.branches.insert(predicate, branch); + } + } + for (predicate, branch) in branching_native.branches.iter() { if branch.is_ended() { log!( cu.inner.logger, - "Native submitting solution for batch {} with {:?}", - index, + "Native submitting solution for branch {} with {:?}", + branch.batch_index, &predicate ); rctx.solution_storage.submit_and_digest_subtree_solution( @@ -356,10 +382,6 @@ impl Connector { predicate.clone(), ); } - if let Some(_) = branching_native.branches.insert(predicate, branch) { - // thanks to the native_branch_spec_var, each batch has a distinct predicate - unreachable!() - } } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); @@ -857,9 +879,9 @@ impl BranchingNative { &branch.gotten ); if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { - let NativeBranch { index, gotten, .. } = branch; + let NativeBranch { batch_index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); - return RoundOk { batch_index: index, gotten }; + return RoundOk { batch_index, gotten }; } } panic!("Native had no branches matching pred {:?}", solution_predicate); diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 60d172653166884cfa7cc2d84faf8f04e5a608c6..b3967f13704811370ed5a7dc8dd174a962cd7228 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -25,7 +25,12 @@ impl Connector { })), } } - pub fn new_udp_port( + /// Conceptually, this returning [p0, g1] is sugar for: + /// 1. create port pair [p0, g0] + /// 2. create port pair [p1, g1] + /// 3. create udp component with interface of moved ports [p1, g0] + /// 4. return [p0, g1] + pub fn new_udp_mediator_component( &mut self, local_addr: SocketAddr, peer_addr: SocketAddr, diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 7d4cc813d1d4e991e3c6915a44f8b95e7f46b100..4aa4e18a2842205889cc59da398bdcf8a8aae677 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -677,8 +677,8 @@ fn udp_self_connect() { let test_log_path = Path::new("./logs/udp_self_connect"); let sock_addrs = [next_test_addr(), next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); - c.new_udp_port(sock_addrs[1], sock_addrs[0]).unwrap(); + c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap(); c.connect(SEC1).unwrap(); } @@ -687,7 +687,7 @@ fn solo_udp_put_success() { let test_log_path = Path::new("./logs/solo_udp_put_success"); let sock_addrs = [next_test_addr(), next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); + let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.sync(MS300).unwrap(); @@ -698,7 +698,7 @@ fn solo_udp_get_fail() { let test_log_path = Path::new("./logs/solo_udp_get_fail"); let sock_addrs = [next_test_addr(), next_test_addr()]; let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); + let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); c.connect(SEC1).unwrap(); c.get(p0).unwrap(); c.sync(MS300).unwrap_err(); @@ -714,7 +714,7 @@ fn reowolf_to_udp() { barrier.wait(); // reowolf thread let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); + let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.sync(MS300).unwrap(); @@ -744,7 +744,7 @@ fn udp_to_reowolf() { barrier.wait(); // reowolf thread let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); + let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); c.connect(SEC1).unwrap(); c.get(p0).unwrap(); c.sync(SEC5).unwrap(); @@ -776,7 +776,7 @@ fn udp_reowolf_swap() { barrier.wait(); // reowolf thread let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap(); + let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(p1).unwrap();