use super::*; enum PollAndPopulateError { PollFailed, Timeout, } struct TryRecvAnyNetError { error: NetEndpointError, index: usize, } ///////////////////// impl NetEndpoint { // Returns the bincode configuration the NetEndpoint uses pervasively // for configuration on ser/de operations. fn bincode_opts() -> impl bincode::config::Options { // uses variable-length encoding everywhere; great! bincode::config::DefaultOptions::default() } // Attempt to return some deserializable T-type from // the inbox or network stream pub(super) fn try_recv( &mut self, logger: &mut dyn Logger, ) -> Result, NetEndpointError> { use NetEndpointError as Nee; // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking) let before_len = self.inbox.len(); 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); match res { Err(e) if err_would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(Nee::BrokenNetEndpoint), } } log!( @ENDPT, logger, "Inbox bytes [{:x?}| {:x?}]", DenseDebugHex(&self.inbox[..before_len]), DenseDebugHex(&self.inbox[before_len..]), ); // Try deserialize from the inbox. `reading_slice' is updated by read() // in-place to truncate the read part. In the event of success, // the message bytes are contained in the truncated prefix let mut reading_slice = self.inbox.as_slice(); let before_len = reading_slice.len(); use bincode::config::Options; match Self::bincode_opts().deserialize_from(&mut reading_slice) { Ok(msg) => { let msg_size = before_len - reading_slice.len(); // inbox[..msg_size] was deserialized into one message! self.inbox.drain(..msg_size); log!( @ENDPT, logger, "Yielding msg. Inbox len {}-{}=={}: [{:?}]", self.inbox.len() + msg_size, msg_size, self.inbox.len(), DenseDebugHex(&self.inbox[..]), ); Ok(Some(msg)) } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { // Contents of inbox insufficient for deserializing a message Ok(None) } _ => Err(Nee::MalformedMessage), }, } } // Send the given serializable type into the stream pub(super) fn send( &mut self, msg: &T, io_byte_buffer: &mut IoByteBuffer, ) -> Result<(), NetEndpointError> { use bincode::config::Options; use NetEndpointError as Nee; // Create a buffer for our bytes: a slice of the io_byte_buffer let mut buf_slice = io_byte_buffer.as_mut_slice(); // serialize into the slice, truncating as its filled Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!"); // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream let wrote = IoByteBuffer::CAPACITY - buf_slice.len(); self.stream .write_all(&io_byte_buffer.as_mut_slice()[..wrote]) .map_err(|_| Nee::BrokenNetEndpoint)?; let _ = self.stream.flush(); Ok(()) } } impl EndpointManager { pub(super) fn index_iter(&self) -> Range { 0..self.num_net_endpoints() } pub(super) fn num_net_endpoints(&self) -> usize { self.net_endpoint_store.endpoint_exts.len() } // Setup-phase particular send procedure. // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| { ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } // Communication-phase particular send procedure. // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_comms( &mut self, index: usize, msg: &Msg, ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint .send(msg, &mut self.io_byte_buffer) .map_err(|_| Use::BrokenNetEndpoint { index }) } /// Receive the first message of any kind at all. /// Why not return SetupMsg? Because often this message will be forwarded to several others, /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!) pub(super) fn try_recv_any_setup( &mut self, logger: &mut dyn Logger, deadline: &Option, ) -> Result<(usize, Msg), ConnectError> { // helper function, mapping a TryRecvAnySetup type error // into a ConnectError fn map_trane( trane: TryRecvAnyNetError, net_endpoint_store: &EndpointStore, ) -> ConnectError { ConnectError::NetEndpointSetupError( net_endpoint_store.endpoint_exts[trane.index] .net_endpoint .stream .local_addr() .unwrap(), // stream must already be connected trane.error, ) } // try yield undelayed net message if let Some(tup) = self.undelayed_messages.pop() { log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup); return Ok(tup); } loop { // try recv from some polled undrained NET endpoint if let Some(tup) = self .try_recv_undrained_net(logger) .map_err(|trane| map_trane(trane, &self.net_endpoint_store))? { return Ok(tup); } // poll if time remains self.poll_and_populate(logger, deadline)?; } } // drops all Setup messages, // buffers all future round messages, // drops all previous round messages, // enqueues all current round SendPayload messages using rctx.getter_push // returns the first comm_ctrl_msg encountered // only polls until SOME message is enqueued pub(super) fn try_recv_any_comms( &mut self, cu: &mut impl CuUndecided, rctx: &mut RoundCtx, round_index: usize, ) -> Result { /////////////////////////////////////////// // adds scoped functionality for EndpointManager impl EndpointManager { // Given some Msg structure in a particular context, // return a control message for the current round // if its a payload message, buffer it instead fn handle_msg( &mut self, cu: &mut impl CuUndecided, rctx: &mut RoundCtx, net_index: usize, msg: Msg, round_index: usize, some_message_enqueued: &mut bool, ) -> Option<(usize, CommCtrlMsg)> { let comm_msg_contents = match msg { Msg::SetupMsg(..) => return None, // discard setup messages Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { Ordering::Equal => comm_msg.contents, // ok, keep going Ordering::Less => { // discard this message log!( cu.logger(), "We are in round {}, but msg is for round {}. Discard", comm_msg.round_index, round_index, ); return None; } Ordering::Greater => { // "delay" this message, enqueueing it for a future round log!( cu.logger(), "We are in round {}, but msg is for round {}. Buffer", comm_msg.round_index, round_index, ); self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg))); return None; } }, }; // inspect the contents of this contemporary message, sorting it match comm_msg_contents { CommMsgContents::CommCtrl(comm_ctrl_msg) => { // yes! this is a CommCtrlMsg Some((net_index, comm_ctrl_msg)) } CommMsgContents::SendPayload(send_payload_msg) => { // Enqueue this payload message // Still not a CommCtrlMsg, so return None let getter = self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; rctx.getter_push(getter, send_payload_msg); *some_message_enqueued = true; None } } } } use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; /////////////////////////////////////////// let mut some_message_enqueued = false; // pop undelayed messages, handling them. Return the first CommCtrlMsg popped while let Some((net_index, msg)) = self.undelayed_messages.pop() { if let Some((net_index, msg)) = self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) { return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } loop { // drain endpoints of incoming messages (without blocking) // return first CommCtrlMsg received while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { if let Some((net_index, msg)) = self.handle_msg( cu, rctx, net_index, msg, round_index, &mut some_message_enqueued, ) { return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } // try receive a udp message let recv_buffer = self.io_byte_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { // I received a payload! self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); rctx.getter_push( ee.getter_for_incoming, SendPayloadMsg { payload, predicate }, ); some_message_enqueued = true; ee.received_this_round = true; } else { // lose the message! } } } if some_message_enqueued { return Ok(CommRecvOk::NewPayloadMsgs); } // poll if time remains match self.poll_and_populate(cu.logger(), &rctx.deadline) { Ok(()) => {} // continue looping Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), Err(Pape::PollFailed) => return Err(Use::PollFailed), } } } // Try receive some message from any net endpoint without blocking fn try_recv_undrained_net( &mut self, logger: &mut dyn Logger, ) -> Result, TryRecvAnyNetError> { while let Some(index) = self.net_endpoint_store.polled_undrained.pop() { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; if let Some(msg) = net_endpoint .try_recv(logger) .map_err(|error| TryRecvAnyNetError { error, index })? { log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg); if !net_endpoint.inbox.is_empty() { // there may be another message waiting! self.net_endpoint_store.polled_undrained.insert(index); } return Ok(Some((index, msg))); } } Ok(None) } // Poll the network, raising `polled_undrained` flags for endpoints // as they receive events. fn poll_and_populate( &mut self, logger: &mut dyn Logger, deadline: &Option, ) -> Result<(), PollAndPopulateError> { use PollAndPopulateError as Pape; // No message yet. Do we have enough time to poll? let remaining = if let Some(deadline) = deadline { Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?) } else { None }; // Yes we do! Poll with remaining time as poll deadline self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; for event in self.events.iter() { match TokenTarget::from(event.token()) { TokenTarget::NetEndpoint { index } => { self.net_endpoint_store.polled_undrained.insert(index); log!( @ENDPT, logger, "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}", &event, index, self.net_endpoint_store.polled_undrained.iter() ); } TokenTarget::UdpEndpoint { index } => { self.udp_endpoint_store.polled_undrained.insert(index); log!( @ENDPT, logger, "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}", &event, index, self.udp_endpoint_store.polled_undrained.iter() ); } } } self.events.clear(); Ok(()) } // Move all delayed messages to undelayed, making it possible to yield them pub(super) fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { // fast path std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); return; } // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } // End the synchronous round for the udp endpoints given the round decision pub(super) fn udp_endpoints_round_end( &mut self, logger: &mut dyn Logger, decision: &Decision, ) -> Result<(), UnrecoverableSyncError> { // retain received_from_this_round for use in pseudo_socket_api::recv_from log!( logger, "Ending round for {} udp endpoints", self.udp_endpoint_store.endpoint_exts.len() ); use UnrecoverableSyncError as Use; if let Decision::Success(solution_predicate) = decision { // Similar to a native component, we commit the branch of the component // consistent with the predicate decided upon, making its effects visible // to the world outside the connector's internals. // In this case, this takes the form of emptying the component's outbox buffer, // actually sending payloads 'on the wire' as UDP messages. for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { ee.sock.send(payload.as_slice()).map_err(|e| { println!("{:?}", e); Use::BrokenUdpEndpoint { index } })?; log!( logger, "Sent payload {:?} with pred {:?} through Udp endpoint {}", &payload, &payload_predicate, index ); // send at most one payload per endpoint per round break 'outgoing_loop; } } ee.received_this_round = false; } } Ok(()) } } impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { struct DebugStream<'a>(&'a TcpStream); impl Debug for DebugStream<'_> { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint") .field("local_addr", &self.0.local_addr()) .field("peer_addr", &self.0.peer_addr()) .finish() } } f.debug_struct("Endpoint") .field("inbox", &self.inbox) .field("stream", &DebugStream(&self.stream)) .finish() } } impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self) } } impl From for ConnectError { fn from(pape: PollAndPopulateError) -> ConnectError { use {ConnectError as Ce, PollAndPopulateError as Pape}; match pape { Pape::PollFailed => Ce::PollFailed, Pape::Timeout => Ce::Timeout, } } } impl From for UnrecoverableSyncError { fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError { let TryRecvAnyNetError { index, .. } = trane; UnrecoverableSyncError::BrokenNetEndpoint { index } } }