diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index ce5dfa05290a337d006c69856dad3f5b27c9834f..ba6f6ca089cdc24c5fa84e7e6e8c585797294c37 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -1,28 +1,39 @@ use super::*; +// A wrapper for some Read type, delegating read calls +// to the contained Read structure, but snooping on +// the number of bytes it reads, to be inspected later. struct MonitoredReader { bytes: usize, r: R, } + enum PollAndPopulateError { PollFailed, Timeout, } + struct TryRecvAnyNetError { error: NetEndpointError, index: usize, } ///////////////////// impl NetEndpoint { + // Returns the bincode configuration the NetEndpoint uses pervasively + // for configuration on ser/de operations. fn bincode_opts() -> impl bincode::config::Options { + // uses variable-length encoding everywhere; great! bincode::config::DefaultOptions::default() } + + // Attempt to return some deserializable T-type from + // the inbox or network stream pub(super) fn try_recv( &mut self, logger: &mut dyn Logger, ) -> Result, NetEndpointError> { use NetEndpointError as Nee; - // populate inbox as much as possible + // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking) let before_len = self.inbox.len(); 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); @@ -40,12 +51,16 @@ impl NetEndpoint { DenseDebugHex(&self.inbox[..before_len]), DenseDebugHex(&self.inbox[before_len..]), ); + // Try deserialize from the inbox, monitoring how many bytes + // the deserialiation process consumes. In the event of + // success, this makes clear where the message ends let mut monitored = MonitoredReader::from(&self.inbox[..]); use bincode::config::Options; match Self::bincode_opts().deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); - self.inbox.drain(0..(msg_size.try_into().unwrap())); + // inbox[..msg_size] was deserialized into one message! + self.inbox.drain(..msg_size); log!( @ENDPT, logger, @@ -59,12 +74,15 @@ impl NetEndpoint { } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { + // Contents of inbox insufficient for deserializing a message Ok(None) } _ => Err(Nee::MalformedMessage), }, } } + + // Send the given serializable type into the stream pub(super) fn send( &mut self, msg: &T, @@ -86,6 +104,18 @@ impl EndpointManager { pub(super) fn num_net_endpoints(&self) -> usize { self.net_endpoint_store.endpoint_exts.len() } + + // Setup-phase particular send procedure. + // Used pervasively, allows for some brevity with the ? operator. + pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|err| { + ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) + }) + } + + // Communication-phase particular send procedure. + // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_comms( &mut self, index: usize, @@ -95,12 +125,6 @@ impl EndpointManager { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) } - pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|err| { - ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) - }) - } /// Receive the first message of any kind at all. /// Why not return SetupMsg? Because often this message will be forwarded to several others, @@ -110,7 +134,8 @@ impl EndpointManager { logger: &mut dyn Logger, deadline: &Option, ) -> Result<(usize, Msg), ConnectError> { - /////////////////////////////////////////// + // helper function, mapping a TryRecvAnySetup type error + // into a ConnectError fn map_trane( trane: TryRecvAnyNetError, net_endpoint_store: &EndpointStore, @@ -124,7 +149,6 @@ impl EndpointManager { trane.error, ) } - /////////////////////////////////////////// // try yield undelayed net message if let Some(tup) = self.undelayed_messages.pop() { log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup); @@ -156,7 +180,11 @@ impl EndpointManager { round_index: usize, ) -> Result { /////////////////////////////////////////// + // adds scoped functionality for EndpointManager impl EndpointManager { + // Given some Msg structure in a particular context, + // return a control message for the current round + // if its a payload message, buffer it instead fn handle_msg( &mut self, cu: &mut impl CuUndecided, @@ -167,10 +195,11 @@ impl EndpointManager { some_message_enqueued: &mut bool, ) -> Option<(usize, CommCtrlMsg)> { let comm_msg_contents = match msg { - Msg::SetupMsg(..) => return None, + Msg::SetupMsg(..) => return None, // discard setup messages Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { - Ordering::Equal => comm_msg.contents, + Ordering::Equal => comm_msg.contents, // ok, keep going Ordering::Less => { + // discard this message log!( cu.logger(), "We are in round {}, but msg is for round {}. Discard", @@ -180,6 +209,7 @@ impl EndpointManager { return None; } Ordering::Greater => { + // "delay" this message, enqueueing it for a future round log!( cu.logger(), "We are in round {}, but msg is for round {}. Buffer", @@ -191,9 +221,15 @@ impl EndpointManager { } }, }; + // inspect the contents of this contemporary message, sorting it match comm_msg_contents { - CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)), + CommMsgContents::CommCtrl(comm_ctrl_msg) => { + // yes! this is a CommCtrlMsg + Some((net_index, comm_ctrl_msg)) + } CommMsgContents::SendPayload(send_payload_msg) => { + // Enqueue this payload message + // Still not a CommCtrlMsg, so return None let getter = self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; rctx.getter_push(getter, send_payload_msg); @@ -206,7 +242,7 @@ impl EndpointManager { use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; /////////////////////////////////////////// let mut some_message_enqueued = false; - // try yield undelayed net message + // pop undelayed messages, handling them. Return the first CommCtrlMsg popped while let Some((net_index, msg)) = self.undelayed_messages.pop() { if let Some((net_index, msg)) = self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) @@ -215,7 +251,8 @@ impl EndpointManager { } } loop { - // try receive a net message + // drain endpoints of incoming messages (without blocking) + // return first CommCtrlMsg received while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { if let Some((net_index, msg)) = self.handle_msg( cu, @@ -237,7 +274,7 @@ impl EndpointManager { self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming); + let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); rctx.getter_push( ee.getter_for_incoming, @@ -261,6 +298,8 @@ impl EndpointManager { } } } + + // Try receive some message from any net endpoint without blocking fn try_recv_undrained_net( &mut self, logger: &mut dyn Logger, @@ -281,6 +320,9 @@ impl EndpointManager { } Ok(None) } + + // Poll the network, raising `polled_undrained` flags for endpoints + // as they receive events. fn poll_and_populate( &mut self, logger: &mut dyn Logger, @@ -297,9 +339,6 @@ impl EndpointManager { self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; for event in self.events.iter() { match TokenTarget::from(event.token()) { - TokenTarget::Waker => { - // Can ignore. Residual event from endpoint manager setup procedure - } TokenTarget::NetEndpoint { index } => { self.net_endpoint_store.polled_undrained.insert(index); log!( @@ -327,6 +366,8 @@ impl EndpointManager { self.events.clear(); Ok(()) } + + // Move all delayed messages to undelayed, making it possible to yield them pub(super) fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { // fast path @@ -336,6 +377,8 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } + + // End the synchronous round for the udp endpoints given the round decision pub(super) fn udp_endpoints_round_end( &mut self, logger: &mut dyn Logger, @@ -349,6 +392,11 @@ impl EndpointManager { ); use UnrecoverableSyncError as Use; if let Decision::Success(solution_predicate) = decision { + // Similar to a native component, we commit the branch of the component + // consistent with the predicate decided upon, making its effects visible + // to the world outside the connector's internals. + // In this case, this takes the form of emptying the component's outbox buffer, + // actually sending payloads 'on the wire' as UDP messages. for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) {