diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 947bbadd2915b8b24b4ff1bafc98fc26844e7297..907d7d3813ed2ddba23ed09dba2ff0848904bb78 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -4,6 +4,12 @@ struct MonitoredReader { bytes: usize, r: R, } +#[derive(Debug)] +enum TryRecyAnyError { + Timeout, + PollFailed, + EndpointError { error: EndpointError, index: usize }, +} ///////////////////// @@ -36,16 +42,48 @@ impl Endpoint { }, } } - pub fn send(&mut self, msg: &T) -> Result<(), ()> { - bincode::serialize_into(&mut self.stream, msg).map_err(drop) + pub fn send(&mut self, msg: &T) -> Result<(), EndpointError> { + bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint) } } impl EndpointManager { - pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { + pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { + let endpoint = &mut self.endpoint_exts[index].endpoint; + endpoint.send(msg).map_err(|err| { + ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err) + }) + } + pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> { self.endpoint_exts[index].endpoint.send(msg) } - pub fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { + pub fn try_recv_any_comms( + &mut self, + deadline: Option, + ) -> Result, SyncError> { + use {SyncError as Se, TryRecyAnyError as Trae}; + match self.try_recv_any(deadline) { + Ok(tup) => Ok(Some(tup)), + Err(Trae::Timeout) => Ok(None), + Err(Trae::PollFailed) => Err(Se::PollFailed), + Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)), + } + } + pub fn try_recv_any_setup( + &mut self, + deadline: Option, + ) -> Result<(usize, Msg), ConnectError> { + use {ConnectError as Ce, TryRecyAnyError as Trae}; + self.try_recv_any(deadline).map_err(|err| match err { + Trae::Timeout => Ce::Timeout, + Trae::PollFailed => Ce::PollFailed, + Trae::EndpointError { error, index } => Ce::EndpointSetupError( + self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(), + error, + ), + }) + } + fn try_recv_any(&mut self, deadline: Option) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { @@ -66,8 +104,12 @@ impl EndpointManager { } } // 3. No message yet. Do we have enough time to poll? - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); self.polled_undrained.insert(index);