From 4ef99b52a75d8d05106198c97b58bc51047a1b03 2020-07-23 15:37:37 From: Christopher Esterhuyse Date: 2020-07-23 15:37:37 Subject: [PATCH] tightened visibility. cleaned up and simplified UDP mediator components --- diff --git a/Cargo.toml b/Cargo.toml index e5d19f0dd2c5e3db8d304dc979408f8f9c2fba95..258a30287cfdafbbc1e850669b231fda1cbb8fe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"] +default = ["ffi", "session_optimization"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs endpoint_logging = [] # see src/macros.rs diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 3c966b92235b970a444b9a00114fc5f064d11435..b16e7a7537fcd8b0fd9498182f9064de078c02fc 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -352,8 +352,6 @@ impl Connector { } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); - - comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.inner.logger); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 52f2cd4dfedffb4c27ab6c6bc5a12e3c63d0065c..d0faa68fcd91726e4d9fcc11f02f0d1d4a2b2ef7 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -234,10 +234,10 @@ impl EndpointManager { let recv_buffer = self.udp_in_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; - if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() { + if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); - if !ee.received_from_this_round.is_none() { + if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); @@ -246,7 +246,7 @@ impl EndpointManager { SendPayloadMsg { payload, predicate }, ); some_message_enqueued = true; - ee.received_from_this_round = Some(from); + ee.received_this_round = true; } else { // lose the message! } @@ -336,16 +336,6 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } - pub(super) fn udp_endpoints_round_start(&mut self, logger: &mut dyn Logger) { - log!( - logger, - "Starting round for {} udp endpoints", - self.udp_endpoint_store.endpoint_exts.len() - ); - for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() { - ee.received_from_this_round = None; - } - } pub(super) fn udp_endpoints_round_end( &mut self, logger: &mut dyn Logger, @@ -359,10 +349,8 @@ impl EndpointManager { ); use UnrecoverableSyncError as Use; if let Decision::Success(solution_predicate) = decision { - 'endpoint_loop: for (index, ee) in - self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() - { - for (payload_predicate, payload) in ee.outgoing_payloads.drain() { + for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { + 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { ee.sock.send(payload.as_slice()).map_err(|e| { println!("{:?}", e); @@ -375,10 +363,11 @@ impl EndpointManager { &payload_predicate, index ); - continue 'endpoint_loop; // send at most one payload per endpoint per round + // send at most one payload per endpoint per round + break 'outgoing_loop; } } - log!(logger, "Sent no message through Udp endpoint {}", index); + ee.received_this_round = false; } } Ok(()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b3aa732d20099ab97b6e7651c4e4ebbaecd66eb7..ee259424adb10d50f7f3b07dc3a4ffbd62bf878a 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -171,7 +171,7 @@ struct NetEndpointExt { #[derive(Debug)] struct UdpEndpointExt { sock: UdpSocket, // already bound and connected - received_from_this_round: Option, + received_this_round: bool, outgoing_payloads: HashMap, getter_for_incoming: PortId, } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 7a3c0d01ac9ff9b16d0c66b5fb562805ad00cc44..476bdb1ca51f259bf91cbcf905ff6edcc70109d4 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -511,7 +511,7 @@ fn new_endpoint_manager( UdpEndpointExt { sock, outgoing_payloads: Default::default(), - received_from_this_round: None, + received_this_round: false, getter_for_incoming, } })