diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index f3149c2a91f75540634698df841a26d35172af13..52f2cd4dfedffb4c27ab6c6bc5a12e3c63d0065c 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -234,10 +234,10 @@ impl EndpointManager { let recv_buffer = self.udp_in_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; - if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { + if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); - if !ee.received_this_round { + if !ee.received_from_this_round.is_none() { let payload = Payload::from(&recv_buffer[..bytes_written]); let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); @@ -246,7 +246,7 @@ impl EndpointManager { SendPayloadMsg { payload, predicate }, ); some_message_enqueued = true; - ee.received_this_round = true; + ee.received_from_this_round = Some(from); } else { // lose the message! } @@ -343,7 +343,7 @@ impl EndpointManager { self.udp_endpoint_store.endpoint_exts.len() ); for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() { - ee.received_this_round = false; + ee.received_from_this_round = None; } } pub(super) fn udp_endpoints_round_end( @@ -351,6 +351,7 @@ impl EndpointManager { logger: &mut dyn Logger, decision: &Decision, ) -> Result<(), UnrecoverableSyncError> { + // retain received_from_this_round for use in pseudo_socket_api::recv_from log!( logger, "Ending round for {} udp endpoints",