diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index a4a927db72cda173496233c78d0a025d78ef8b7b..8d32b65070a7ecdfabb545fad1ff440046b1db05 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -226,6 +226,39 @@ trait Messengerlike { } } } + + // attempt to receive a message from one of the endpoints before the deadline + fn recv_until( + &mut self, + deadline: Option, + ) -> Result, MessengerRecvErr> { + // try get something buffered + if let Some(x) = self.get_state_mut().undelayed.pop() { + return Ok(Some(x)); + } + + loop { + // polled_undrained may not be empty + while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { + if let Some(msg) = self.get_endpoint_mut(eekey).recv()? { + // this endpoint MAY still have messages! check again in future + self.get_state_mut().polled_undrained.insert(eekey); + return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + } + } + + let state = self.get_state_mut(); + match state.poll_events_until(deadline) { + Ok(()) => { + for e in state.events.iter() { + state.polled_undrained.insert(Key::from_token(e.token())); + } + } + Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), + Err(PollDeadlineErr::Timeout) => return Ok(None), + } + } + } } ///////////////////////////////// @@ -267,7 +300,7 @@ impl Default for Arena { impl Arena { pub fn alloc(&mut self, t: T) -> Key { self.storage.push(t); - Key::from_raw(self.storage.len() as u64 - 1) + Key::from_raw(self.storage.len() - 1) } pub fn get(&self, key: Key) -> Option<&T> { self.storage.get(key.to_raw() as usize) @@ -285,7 +318,7 @@ impl Arena { self.storage.len() } pub fn keyspace(&self) -> impl Iterator { - (0..(self.storage.len() as u64)).map(Key::from_raw) + (0..self.storage.len()).map(Key::from_raw) } } @@ -308,6 +341,17 @@ impl MessengerState { self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; Ok(()) } + fn poll_events_until(&mut self, deadline: Option) -> Result<(), PollDeadlineErr> { + use PollDeadlineErr::*; + self.events.clear(); + let poll_timeout = if let Some(d) = deadline { + Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?; + Ok(()) + } } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> ConnectErr {