diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 9ecab8f8f7dccca0cc1263d248e2113af58aff08..3118126f12f8118c6e8219beda332e544c883cc8 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -10,7 +10,6 @@ enum TryRecyAnyError { PollFailed, EndpointError { error: EndpointError, index: usize }, } - ///////////////////// impl Endpoint { fn bincode_opts() -> impl bincode::config::Options { @@ -20,40 +19,52 @@ impl Endpoint { &mut self, logger: &mut dyn Logger, ) -> Result, EndpointError> { - use EndpointError::*; + use EndpointError as Ee; // populate inbox as much as possible + let before_len = self.inbox.len(); 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); - endptlog!(logger, "Stream read to end {:?}", &res); match res { Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), - Err(_e) => return Err(BrokenEndpoint), + Err(_e) => return Err(Ee::BrokenEndpoint), } } - endptlog!(logger, "Inbox bytes {:x?}", &self.inbox); + endptlog!( + logger, + "Inbox bytes [{:x?}| {:x?}]", + DenseDebugHex(&self.inbox[..before_len]), + DenseDebugHex(&self.inbox[before_len..]), + ); let mut monitored = MonitoredReader::from(&self.inbox[..]); use bincode::config::Options; match Self::bincode_opts().deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); self.inbox.drain(0..(msg_size.try_into().unwrap())); + endptlog!( + logger, + "Yielding msg. Inbox len {}-{}=={}: [{:?}]", + self.inbox.len() + msg_size, + msg_size, + self.inbox.len(), + DenseDebugHex(&self.inbox[..]), + ); Ok(Some(msg)) } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } - _ => Err(MalformedMessage), + _ => Err(Ee::MalformedMessage), }, } } pub(super) fn send(&mut self, msg: &T) -> Result<(), EndpointError> { use bincode::config::Options; - Self::bincode_opts() - .serialize_into(&mut self.stream, msg) - .map_err(|_| EndpointError::BrokenEndpoint) + use EndpointError as Ee; + Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint) } } @@ -74,9 +85,6 @@ impl EndpointManager { ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err) }) } - pub(super) fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> { - self.endpoint_exts[index].endpoint.send(msg) - } pub(super) fn try_recv_any_comms( &mut self, logger: &mut dyn Logger, @@ -185,7 +193,6 @@ impl Read for MonitoredReader { Ok(n) } } - impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self)