diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3999f7a07d4287caa3eaccb111afff0a6a0286e3..786bd0de9a8c09a602fa0be0bb4d1cc9abfe6b9c 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -62,7 +62,7 @@ pub(crate) enum CommonSatResult { } pub struct Endpoint { inbox: Vec, - stream: mio07::net::TcpStream, + stream: TcpStream, } #[derive(Debug, Default)] pub struct IntStream { @@ -111,10 +111,11 @@ pub struct MemInMsg { } #[derive(Debug)] pub struct EndpointPoller { - poll: mio07::Poll, - events: mio07::Events, - undrained_endpoints: HashSet, - delayed_inp_messages: Vec<(PortId, Msg)>, + poll: Poll, + events: Events, + undrained_endpoints: IndexSet, + delayed_messages: Vec<(usize, Msg)>, + undelayed_messages: Vec<(usize, Msg)>, } #[derive(Debug)] pub struct Connector { @@ -165,7 +166,54 @@ pub struct SyncContext<'a> { pub struct NonsyncContext<'a> { connector: &'a mut Connector, } +enum TryRecyAnyError { + Timeout, + PollFailed, + EndpointRecvErr { error: EndpointRecvErr, index: usize }, + BrokenEndpoint(usize), +} //////////////// +impl EndpointPoller { + fn try_recv_any( + &mut self, + endpoint_exts: &mut [EndpointExt], + deadline: Instant, + ) -> Result<(usize, Msg), TryRecyAnyError> { + use TryRecyAnyError::*; + // 1. try messages already buffered + if let Some(x) = self.undelayed_messages.pop() { + return Ok(x); + } + // 2. try read from sockets nonblocking + while let Some(index) = self.undrained_endpoints.pop() { + if let Some(msg) = endpoint_exts[index] + .endpoint + .try_recv() + .map_err(|error| EndpointRecvErr { error, index })? + { + return Ok((index, msg)); + } + } + // 3. poll for progress + loop { + let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; + self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; + for event in self.events.iter() { + let Token(index) = event.token(); + if let Some(msg) = endpoint_exts[index] + .endpoint + .try_recv() + .map_err(|error| EndpointRecvErr { error, index })? + { + return Ok((index, msg)); + } + } + } + } + fn undelay_all(&mut self) { + self.undelayed_messages.extend(self.delayed_messages.drain(..)); + } +} impl Debug for Endpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() @@ -291,9 +339,21 @@ impl Endpoint { } } impl Connector { - fn get_logger(&self) -> &dyn Logger { + pub fn get_logger(&self) -> &dyn Logger { &*self.logger } + pub fn print_state(&self) { + let stdout = std::io::stdout(); + let mut lock = stdout.lock(); + writeln!( + lock, + "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n", + self.id_manager.controller_id + ) + .unwrap(); + self.get_logger().dump_log(&mut lock); + writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); + } } // #[derive(Debug)]