diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 907d7d3813ed2ddba23ed09dba2ff0848904bb78..57e0a05e44819dcbd232f59958c116c54704a2c9 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -13,13 +13,21 @@ enum TryRecyAnyError { ///////////////////// +fn would_block(err: &std::io::Error) -> bool { + err.kind() == std::io::ErrorKind::WouldBlock +} impl Endpoint { - pub fn try_recv(&mut self) -> Result, EndpointError> { + pub fn try_recv( + &mut self, + logger: &mut dyn Logger, + ) -> Result, EndpointError> { use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { - match self.stream.read_to_end(&mut self.inbox) { - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + let res = self.stream.read_to_end(&mut self.inbox); + endptlog!(logger, "Stream read to end {:?}", &res); + match res { + Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(BrokenEndpoint), @@ -48,6 +56,12 @@ impl Endpoint { } impl EndpointManager { + pub fn index_iter(&self) -> Range { + 0..self.num_endpoints() + } + pub fn num_endpoints(&self) -> usize { + self.endpoint_exts.len() + } pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let endpoint = &mut self.endpoint_exts[index].endpoint; endpoint.send(msg).map_err(|err| { @@ -59,22 +73,24 @@ impl EndpointManager { } pub fn try_recv_any_comms( &mut self, + logger: &mut dyn Logger, deadline: Option, ) -> Result, SyncError> { use {SyncError as Se, TryRecyAnyError as Trae}; - match self.try_recv_any(deadline) { + match self.try_recv_any(logger, deadline) { Ok(tup) => Ok(Some(tup)), Err(Trae::Timeout) => Ok(None), Err(Trae::PollFailed) => Err(Se::PollFailed), - Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)), + Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)), } } pub fn try_recv_any_setup( &mut self, + logger: &mut dyn Logger, deadline: Option, ) -> Result<(usize, Msg), ConnectError> { use {ConnectError as Ce, TryRecyAnyError as Trae}; - self.try_recv_any(deadline).map_err(|err| match err { + self.try_recv_any(logger, deadline).map_err(|err| match err { Trae::Timeout => Ce::Timeout, Trae::PollFailed => Ce::PollFailed, Trae::EndpointError { error, index } => Ce::EndpointSetupError( @@ -83,36 +99,56 @@ impl EndpointManager { ), }) } - fn try_recv_any(&mut self, deadline: Option) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError::*; + fn try_recv_any( + &mut self, + logger: &mut dyn Logger, + deadline: Option, + ) -> Result<(usize, Msg), TryRecyAnyError> { + use TryRecyAnyError as Trea; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { + endptlog!(logger, "RECV undelayed_msg {:?}", &x); return Ok(x); } loop { // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; - if let Some(msg) = - endpoint.try_recv().map_err(|error| EndpointError { error, index })? + if let Some(msg) = endpoint + .try_recv(logger) + .map_err(|error| Trea::EndpointError { error, index })? { - if !endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); - } + endptlog!(logger, "RECV polled_undrained {:?}", &msg); + // if !endpoint.inbox.is_empty() { + // there may be another message waiting! + self.polled_undrained.insert(index); + // } return Ok((index, msg)); } } // 3. No message yet. Do we have enough time to poll? let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?) } else { None }; - self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?; + self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); self.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}", + &event, + index, + self.polled_undrained.iter() + ); + if event.is_error() { + return Err(Trea::EndpointError { + error: EndpointError::BrokenEndpoint, + index, + }); + } } self.events.clear(); }