Files
@ e7b7d53e6952
Branch filter:
Location: CSY/reowolf/src/runtime/endpoints.rs
e7b7d53e6952
3.9 KiB
application/rls-services+xml
more tests
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | use super::*;
struct MonitoredReader<R: Read> {
bytes: usize,
r: R,
}
/////////////////////
impl Endpoint {
pub fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
use EndpointError::*;
// populate inbox as much as possible
'read_loop: loop {
match self.stream.read_to_end(&mut self.inbox) {
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
Ok(0) => break 'read_loop,
Ok(_) => (),
Err(_e) => return Err(BrokenEndpoint),
}
}
let mut monitored = MonitoredReader::from(&self.inbox[..]);
match bincode::deserialize_from(&mut monitored) {
Ok(msg) => {
let msg_size = monitored.bytes_read();
self.inbox.drain(0..(msg_size.try_into().unwrap()));
Ok(Some(msg))
}
Err(e) => match *e {
bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
Ok(None)
}
_ => Err(MalformedMessage),
// println!("SERDE ERRKIND {:?}", e);
// Err(MalformedMessage)
},
}
}
pub fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
bincode::serialize_into(&mut self.stream, msg).map_err(drop)
}
}
impl EndpointManager {
pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
self.endpoint_exts[index].endpoint.send(msg)
}
pub fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
use TryRecyAnyError::*;
// 1. try messages already buffered
if let Some(x) = self.undelayed_messages.pop() {
return Ok(x);
}
loop {
// 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
while let Some(index) = self.polled_undrained.pop() {
let endpoint = &mut self.endpoint_exts[index].endpoint;
if let Some(msg) =
endpoint.try_recv().map_err(|error| EndpointError { error, index })?
{
if !endpoint.inbox.is_empty() {
// there may be another message waiting!
self.polled_undrained.insert(index);
}
return Ok((index, msg));
}
}
// 3. No message yet. Do we have enough time to poll?
let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
for event in self.events.iter() {
let Token(index) = event.token();
self.polled_undrained.insert(index);
}
self.events.clear();
}
}
pub fn undelay_all(&mut self) {
if self.undelayed_messages.is_empty() {
// fast path
std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
return;
}
// slow path
self.undelayed_messages.extend(self.delayed_messages.drain(..));
}
}
impl Debug for Endpoint {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
}
}
impl<R: Read> From<R> for MonitoredReader<R> {
fn from(r: R) -> Self {
Self { r, bytes: 0 }
}
}
impl<R: Read> MonitoredReader<R> {
pub fn bytes_read(&self) -> usize {
self.bytes
}
}
impl<R: Read> Read for MonitoredReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let n = self.r.read(buf)?;
self.bytes += n;
Ok(n)
}
}
impl Into<Msg> for SetupMsg {
fn into(self) -> Msg {
Msg::SetupMsg(self)
}
}
|