Files
@ d1a70dfdafba
Branch filter:
Location: CSY/reowolf/src/runtime/endpoints.rs - annotation
d1a70dfdafba
5.5 KiB
application/rls-services+xml
more robust error handling
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 d1a70dfdafba d1a70dfdafba e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba e7b7d53e6952 e7b7d53e6952 d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba d1a70dfdafba e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 e7b7d53e6952 | use super::*;
struct MonitoredReader<R: Read> {
bytes: usize,
r: R,
}
#[derive(Debug)]
enum TryRecyAnyError {
Timeout,
PollFailed,
EndpointError { error: EndpointError, index: usize },
}
/////////////////////
impl Endpoint {
pub fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
use EndpointError::*;
// populate inbox as much as possible
'read_loop: loop {
match self.stream.read_to_end(&mut self.inbox) {
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
Ok(0) => break 'read_loop,
Ok(_) => (),
Err(_e) => return Err(BrokenEndpoint),
}
}
let mut monitored = MonitoredReader::from(&self.inbox[..]);
match bincode::deserialize_from(&mut monitored) {
Ok(msg) => {
let msg_size = monitored.bytes_read();
self.inbox.drain(0..(msg_size.try_into().unwrap()));
Ok(Some(msg))
}
Err(e) => match *e {
bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
Ok(None)
}
_ => Err(MalformedMessage),
// println!("SERDE ERRKIND {:?}", e);
// Err(MalformedMessage)
},
}
}
pub fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint)
}
}
impl EndpointManager {
pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
let endpoint = &mut self.endpoint_exts[index].endpoint;
endpoint.send(msg).map_err(|err| {
ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
})
}
pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
self.endpoint_exts[index].endpoint.send(msg)
}
pub fn try_recv_any_comms(
&mut self,
deadline: Option<Instant>,
) -> Result<Option<(usize, Msg)>, SyncError> {
use {SyncError as Se, TryRecyAnyError as Trae};
match self.try_recv_any(deadline) {
Ok(tup) => Ok(Some(tup)),
Err(Trae::Timeout) => Ok(None),
Err(Trae::PollFailed) => Err(Se::PollFailed),
Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)),
}
}
pub fn try_recv_any_setup(
&mut self,
deadline: Option<Instant>,
) -> Result<(usize, Msg), ConnectError> {
use {ConnectError as Ce, TryRecyAnyError as Trae};
self.try_recv_any(deadline).map_err(|err| match err {
Trae::Timeout => Ce::Timeout,
Trae::PollFailed => Ce::PollFailed,
Trae::EndpointError { error, index } => Ce::EndpointSetupError(
self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(),
error,
),
})
}
fn try_recv_any(&mut self, deadline: Option<Instant>) -> Result<(usize, Msg), TryRecyAnyError> {
use TryRecyAnyError::*;
// 1. try messages already buffered
if let Some(x) = self.undelayed_messages.pop() {
return Ok(x);
}
loop {
// 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
while let Some(index) = self.polled_undrained.pop() {
let endpoint = &mut self.endpoint_exts[index].endpoint;
if let Some(msg) =
endpoint.try_recv().map_err(|error| EndpointError { error, index })?
{
if !endpoint.inbox.is_empty() {
// there may be another message waiting!
self.polled_undrained.insert(index);
}
return Ok((index, msg));
}
}
// 3. No message yet. Do we have enough time to poll?
let remaining = if let Some(deadline) = deadline {
Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
} else {
None
};
self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?;
for event in self.events.iter() {
let Token(index) = event.token();
self.polled_undrained.insert(index);
}
self.events.clear();
}
}
pub fn undelay_all(&mut self) {
if self.undelayed_messages.is_empty() {
// fast path
std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
return;
}
// slow path
self.undelayed_messages.extend(self.delayed_messages.drain(..));
}
}
impl Debug for Endpoint {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
}
}
impl<R: Read> From<R> for MonitoredReader<R> {
fn from(r: R) -> Self {
Self { r, bytes: 0 }
}
}
impl<R: Read> MonitoredReader<R> {
pub fn bytes_read(&self) -> usize {
self.bytes
}
}
impl<R: Read> Read for MonitoredReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let n = self.r.read(buf)?;
self.bytes += n;
Ok(n)
}
}
impl Into<Msg> for SetupMsg {
fn into(self) -> Msg {
Msg::SetupMsg(self)
}
}
|