diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 786bd0de9a8c09a602fa0be0bb4d1cc9abfe6b9c..20d756b80a66a1c6ff7ea04e8276039df12f1443 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -110,12 +110,16 @@ pub struct MemInMsg { msg: Payload, } #[derive(Debug)] -pub struct EndpointPoller { +pub struct EndpointManager { + // invariants: + // 1. endpoint N is registered READ | WRITE with poller + // 2. Events is empty poll: Poll, events: Events, undrained_endpoints: IndexSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, + endpoint_exts: Vec, } #[derive(Debug)] pub struct Connector { @@ -135,8 +139,7 @@ pub enum ConnectorPhased { surplus_sockets: u16, }, Communication { - endpoint_poller: EndpointPoller, - endpoint_exts: Vec, + endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, }, @@ -173,12 +176,11 @@ enum TryRecyAnyError { BrokenEndpoint(usize), } //////////////// -impl EndpointPoller { - fn try_recv_any( - &mut self, - endpoint_exts: &mut [EndpointExt], - deadline: Instant, - ) -> Result<(usize, Msg), TryRecyAnyError> { +impl EndpointManager { + fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { + self.endpoint_exts[index].endpoint.send(msg) + } + fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { @@ -186,7 +188,7 @@ impl EndpointPoller { } // 2. try read from sockets nonblocking while let Some(index) = self.undrained_endpoints.pop() { - if let Some(msg) = endpoint_exts[index] + if let Some(msg) = self.endpoint_exts[index] .endpoint .try_recv() .map_err(|error| EndpointRecvErr { error, index })? @@ -200,7 +202,7 @@ impl EndpointPoller { self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); - if let Some(msg) = endpoint_exts[index] + if let Some(msg) = self.endpoint_exts[index] .endpoint .try_recv() .map_err(|error| EndpointRecvErr { error, index })?