diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index ba6f6ca089cdc24c5fa84e7e6e8c585797294c37..ba531398af9ba9f4d4efbdc534aed68298c29238 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -1,13 +1,5 @@ use super::*; -// A wrapper for some Read type, delegating read calls -// to the contained Read structure, but snooping on -// the number of bytes it reads, to be inspected later. -struct MonitoredReader { - bytes: usize, - r: R, -} - enum PollAndPopulateError { PollFailed, Timeout, @@ -51,14 +43,15 @@ impl NetEndpoint { DenseDebugHex(&self.inbox[..before_len]), DenseDebugHex(&self.inbox[before_len..]), ); - // Try deserialize from the inbox, monitoring how many bytes - // the deserialiation process consumes. In the event of - // success, this makes clear where the message ends - let mut monitored = MonitoredReader::from(&self.inbox[..]); + // Try deserialize from the inbox. `reading_slice' is updated by read() + // in-place to truncate the read part. In the event of success, + // the message bytes are contained in the truncated prefix + let mut reading_slice = self.inbox.as_slice(); + let before_len = reading_slice.len(); use bincode::config::Options; - match Self::bincode_opts().deserialize_from(&mut monitored) { + match Self::bincode_opts().deserialize_from(&mut reading_slice) { Ok(msg) => { - let msg_size = monitored.bytes_read(); + let msg_size = before_len - reading_slice.len(); // inbox[..msg_size] was deserialized into one message! self.inbox.drain(..msg_size); log!( @@ -83,14 +76,21 @@ impl NetEndpoint { } // Send the given serializable type into the stream - pub(super) fn send( + pub(super) fn send( &mut self, msg: &T, + io_byte_buffer: &mut IoByteBuffer, ) -> Result<(), NetEndpointError> { use bincode::config::Options; use NetEndpointError as Nee; - Self::bincode_opts() - .serialize_into(&mut self.stream, msg) + // Create a buffer for our bytes: a slice of the io_byte_buffer + let mut buf_slice = io_byte_buffer.as_mut_slice(); + // serialize into the slice, truncating as its filled + Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!"); + // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream + let wrote = IoByteBuffer::CAPACITY - buf_slice.len(); + self.stream + .write_all(&io_byte_buffer.as_mut_slice()[..wrote]) .map_err(|_| Nee::BrokenNetEndpoint)?; let _ = self.stream.flush(); Ok(()) @@ -109,7 +109,7 @@ impl EndpointManager { // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|err| { + net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| { ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } @@ -123,7 +123,9 @@ impl EndpointManager { ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) + net_endpoint + .send(msg, &mut self.io_byte_buffer) + .map_err(|_| Use::BrokenNetEndpoint { index }) } /// Receive the first message of any kind at all. @@ -266,7 +268,7 @@ impl EndpointManager { } } // try receive a udp message - let recv_buffer = self.udp_in_buffer.as_mut_slice(); + let recv_buffer = self.io_byte_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { @@ -438,23 +440,6 @@ impl Debug for NetEndpoint { .finish() } } -impl From for MonitoredReader { - fn from(r: R) -> Self { - Self { r, bytes: 0 } - } -} -impl MonitoredReader { - pub(super) fn bytes_read(&self) -> usize { - self.bytes - } -} -impl Read for MonitoredReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = self.r.read(buf)?; - self.bytes += n; - Ok(n) - } -} impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self)