diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 8e24363ee27705cf72012d3e3264efbd972e16e5..4bf276b09662c2af5fcc5e71155123b3a76ff550 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -123,8 +123,8 @@ struct ControllerInner { channel_id_stream: ChannelIdStream, endpoint_exts: Arena, messenger_state: MessengerState, - mono_n: Option, - mono_ps: Vec, + mono_n: MonoN, // state at next round start + mono_ps: Vec, // state at next round start family: ControllerFamily, logger: String, } @@ -135,6 +135,7 @@ struct ControllerEphemeral { solution_storage: SolutionStorage, poly_n: Option, poly_ps: Vec, + mono_ps: Vec, ekey_to_holder: HashMap, } @@ -168,6 +169,7 @@ pub(crate) enum SubtreeId { pub(crate) struct MonoPContext<'a> { inner: &'a mut ControllerInner, ekeys: &'a mut HashSet, + mono_ps: &'a mut Vec, } pub(crate) struct PolyPContext<'a> { my_subtree_id: SubtreeId, @@ -246,15 +248,10 @@ trait Messengerlike { } } } - - // attempt to receive a message from one of the endpoints before the deadline - fn recv_until( - &mut self, - deadline: Option, - ) -> Result, MessengerRecvErr> { + fn recv_blocking(&mut self) -> Result { // try get something buffered if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(Some(x)); + return Ok(x); } loop { @@ -267,19 +264,18 @@ trait Messengerlike { { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); - return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + return Ok(ReceivedMsg { recipient: eekey, msg }); } } let state = self.get_state_mut(); - match state.poll_events_until(deadline) { - Ok(()) => { - for e in state.events.iter() { - state.polled_undrained.insert(Key::from_token(e.token())); - } - } - Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), - Err(PollDeadlineErr::Timeout) => return Ok(None), + + state + .poll + .poll(&mut state.events, None) + .map_err(|_| MessengerRecvErr::PollingFailed)?; + for e in state.events.iter() { + state.polled_undrained.insert(Key::from_token(e.token())); } } } @@ -357,15 +353,6 @@ impl ChannelIdStream { } impl MessengerState { - fn with_event_capacity(event_capacity: usize) -> Result { - Ok(Self { - poll: Poll::new()?, - events: Events::with_capacity(event_capacity), - delayed: Default::default(), - undelayed: Default::default(), - polled_undrained: Default::default(), - }) - } // does NOT guarantee that events is non-empty fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { use PollDeadlineErr::*; @@ -374,17 +361,6 @@ impl MessengerState { self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; Ok(()) } - fn poll_events_until(&mut self, deadline: Option) -> Result<(), PollDeadlineErr> { - use PollDeadlineErr::*; - self.events.clear(); - let poll_timeout = if let Some(d) = deadline { - Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?) - } else { - None - }; - self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?; - Ok(()) - } } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> ConnectErr {