diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 52f2cd4dfedffb4c27ab6c6bc5a12e3c63d0065c..d0faa68fcd91726e4d9fcc11f02f0d1d4a2b2ef7 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -234,10 +234,10 @@ impl EndpointManager { let recv_buffer = self.udp_in_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; - if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() { + if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); - if !ee.received_from_this_round.is_none() { + if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); @@ -246,7 +246,7 @@ impl EndpointManager { SendPayloadMsg { payload, predicate }, ); some_message_enqueued = true; - ee.received_from_this_round = Some(from); + ee.received_this_round = true; } else { // lose the message! } @@ -336,16 +336,6 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } - pub(super) fn udp_endpoints_round_start(&mut self, logger: &mut dyn Logger) { - log!( - logger, - "Starting round for {} udp endpoints", - self.udp_endpoint_store.endpoint_exts.len() - ); - for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() { - ee.received_from_this_round = None; - } - } pub(super) fn udp_endpoints_round_end( &mut self, logger: &mut dyn Logger, @@ -359,10 +349,8 @@ impl EndpointManager { ); use UnrecoverableSyncError as Use; if let Decision::Success(solution_predicate) = decision { - 'endpoint_loop: for (index, ee) in - self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() - { - for (payload_predicate, payload) in ee.outgoing_payloads.drain() { + for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { + 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { ee.sock.send(payload.as_slice()).map_err(|e| { println!("{:?}", e); @@ -375,10 +363,11 @@ impl EndpointManager { &payload_predicate, index ); - continue 'endpoint_loop; // send at most one payload per endpoint per round + // send at most one payload per endpoint per round + break 'outgoing_loop; } } - log!(logger, "Sent no message through Udp endpoint {}", index); + ee.received_this_round = false; } } Ok(())