diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index a105ff1777b3e656ab79ee71268f1f204ff707e0..551f2604b1075e9b52f53715c175f3b815d10dc1 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -4,11 +4,13 @@ struct MonitoredReader { bytes: usize, r: R, } -#[derive(Debug)] -enum TryRecyAnyError { - Timeout, +enum PollAndPopulateError { PollFailed, - EndpointError { error: EndpointError, index: usize }, + Timeout, +} +struct TryRecvAnyNetError { + error: NetEndpointError, + index: usize, } ///////////////////// impl NetEndpoint { @@ -18,8 +20,8 @@ impl NetEndpoint { pub(super) fn try_recv( &mut self, logger: &mut dyn Logger, - ) -> Result, EndpointError> { - use EndpointError as Ee; + ) -> Result, NetEndpointError> { + use NetEndpointError as Nee; // populate inbox as much as possible let before_len = self.inbox.len(); 'read_loop: loop { @@ -28,7 +30,7 @@ impl NetEndpoint { Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), - Err(_e) => return Err(Ee::BrokenEndpoint), + Err(_e) => return Err(Nee::BrokenNetEndpoint), } } endptlog!( @@ -57,14 +59,19 @@ impl NetEndpoint { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } - _ => Err(Ee::MalformedMessage), + _ => Err(Nee::MalformedMessage), }, } } - pub(super) fn send(&mut self, msg: &T) -> Result<(), EndpointError> { + pub(super) fn send( + &mut self, + msg: &T, + ) -> Result<(), NetEndpointError> { use bincode::config::Options; - use EndpointError as Ee; - Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint) + use NetEndpointError as Nee; + Self::bincode_opts() + .serialize_into(&mut self.stream, msg) + .map_err(|_| Nee::BrokenNetEndpoint) } } @@ -73,7 +80,7 @@ impl EndpointManager { 0..self.num_net_endpoints() } pub(super) fn num_net_endpoints(&self) -> usize { - self.net_endpoint_exts.len() + self.net_endpoint_store.endpoint_exts.len() } pub(super) fn send_to_comms( &mut self, @@ -81,90 +88,247 @@ impl EndpointManager { msg: &Msg, ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index)) + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) } pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint.send(msg).map_err(|err| { - ConnectError::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) + ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } - pub(super) fn try_recv_any_comms( - &mut self, - logger: &mut dyn Logger, - deadline: Option, - ) -> Result, UnrecoverableSyncError> { - use {TryRecyAnyError as Trae, UnrecoverableSyncError as Use}; - match self.try_recv_any(logger, deadline) { - Ok(tup) => Ok(Some(tup)), - Err(Trae::Timeout) => Ok(None), - Err(Trae::PollFailed) => Err(Use::PollFailed), - Err(Trae::EndpointError { error: _, index }) => Err(Use::BrokenEndpoint(index)), - } - } + + /// Receive the first message of any kind at all. + /// Why not return SetupMsg? Because often this message will be forwarded to several others, + /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!) pub(super) fn try_recv_any_setup( &mut self, logger: &mut dyn Logger, - deadline: Option, + deadline: &Option, ) -> Result<(usize, Msg), ConnectError> { - use {ConnectError as Ce, TryRecyAnyError as Trae}; - self.try_recv_any(logger, deadline).map_err(|err| match err { - Trae::Timeout => Ce::Timeout, - Trae::PollFailed => Ce::PollFailed, - Trae::EndpointError { error, index } => Ce::EndpointSetupError( - self.net_endpoint_exts[index].net_endpoint.stream.local_addr().unwrap(), - error, - ), - }) + /////////////////////////////////////////// + fn map_trane( + trane: TryRecvAnyNetError, + net_endpoint_store: &EndpointStore, + ) -> ConnectError { + ConnectError::NetEndpointSetupError( + net_endpoint_store.endpoint_exts[trane.index] + .net_endpoint + .stream + .local_addr() + .unwrap(), // stream must already be connected + trane.error, + ) + } + /////////////////////////////////////////// + // try yield undelayed net message + if let Some(tup) = self.undelayed_messages.pop() { + endptlog!(logger, "RECV undelayed_msg {:?}", &tup); + return Ok(tup); + } + loop { + // try recv from some polled undrained NET endpoint + if let Some(tup) = self + .try_recv_undrained_net(logger) + .map_err(|trane| map_trane(trane, &self.net_endpoint_store))? + { + return Ok(tup); + } + // poll if time remains + self.poll_and_polulate(logger, deadline)?; + } } - fn try_recv_any( + + // drops all Setup messages, + // buffers all future round messages, + // drops all previous round messages, + // enqueues all current round SendPayload messages using round_ctx.getter_add + // returns the first comm_ctrl_msg encountered + // only polls until SOME message is enqueued + pub(super) fn try_recv_any_comms( &mut self, logger: &mut dyn Logger, - deadline: Option, - ) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError as Trea; - // 1. try messages already buffered - if let Some(x) = self.undelayed_messages.pop() { - endptlog!(logger, "RECV undelayed_msg {:?}", &x); - return Ok(x); - } - loop { - // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained - while let Some(index) = self.polled_undrained.pop() { - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; - if let Some(msg) = net_endpoint - .try_recv(logger) - .map_err(|error| Trea::EndpointError { error, index })? - { - endptlog!(logger, "RECV polled_undrained {:?}", &msg); - if !net_endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); + port_info: &PortInfo, + round_ctx: &mut impl RoundCtxTrait, + round_index: usize, + ) -> Result { + /////////////////////////////////////////// + impl EndpointManager { + fn handle_msg( + &mut self, + logger: &mut dyn Logger, + round_ctx: &mut impl RoundCtxTrait, + net_endpoint_index: usize, + msg: Msg, + round_index: usize, + some_message_enqueued: &mut bool, + ) -> Option<(usize, CommCtrlMsg)> { + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => return None, + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + round_index, + ); + return None; + } + Ordering::Greater => { + log!( + logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + round_index, + ); + self.delayed_messages + .push((net_endpoint_index, Msg::CommMsg(comm_msg))); + return None; + } + }, + }; + match comm_msg_contents { + CommMsgContents::CommCtrl(comm_ctrl_msg) => { + Some((net_endpoint_index, comm_ctrl_msg)) + } + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = self.net_endpoint_store.endpoint_exts[net_endpoint_index] + .getter_for_incoming; + round_ctx.getter_add(getter, send_payload_msg); + *some_message_enqueued = true; + None } - return Ok((index, msg)); } } - // 3. No message yet. Do we have enough time to poll? - let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?) - } else { - None - }; - self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?; - for event in self.events.iter() { - let Token(index) = event.token(); - self.polled_undrained.insert(index); - endptlog!( + } + use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; + /////////////////////////////////////////// + let mut some_message_enqueued = false; + // try yield undelayed net message + while let Some((net_endpoint_index, msg)) = self.undelayed_messages.pop() { + if let Some((net_endpoint_index, msg)) = self.handle_msg( + logger, + round_ctx, + net_endpoint_index, + msg, + round_index, + &mut some_message_enqueued, + ) { + return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + } + } + loop { + // try receive a net message + while let Some((net_endpoint_index, msg)) = self.try_recv_undrained_net(logger)? { + if let Some((net_endpoint_index, msg)) = self.handle_msg( logger, - "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}", - &event, - index, - self.polled_undrained.iter() - ); + round_ctx, + net_endpoint_index, + msg, + round_index, + &mut some_message_enqueued, + ) { + return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + } + } + // try receive a udp message + let recv_buffer = self.udp_in_buffer.as_mut_slice(); + while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { + let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; + if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { + // I received a payload! + self.udp_endpoint_store.polled_undrained.insert(index); + let payload = Payload::from(&recv_buffer[..bytes_written]); + let [branch_spec_var, port_spec_var] = [ + ee.incoming_round_spec_var.unwrap(), // should not be NONE + port_info.spec_var_for(ee.getter_for_incoming), + ]; + let branch_spec_val = SpecVal::nth_domain_element(ee.incoming_payloads.len()); + ee.incoming_payloads.push(payload.clone()); + let predicate = Predicate::default() + .inserted(branch_spec_var, branch_spec_val) + .inserted(port_spec_var, SpecVal::FIRING); + round_ctx + .getter_add(ee.getter_for_incoming, SendPayloadMsg { payload, predicate }); + some_message_enqueued = true; + } + } + if some_message_enqueued { + return Ok(CommRecvOk::NewPayloadMsgs); + } + // poll if time remains + match self.poll_and_polulate(logger, round_ctx.get_deadline()) { + Ok(()) => {} // continue looping + Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), + Err(Pape::PollFailed) => return Err(Use::PollFailed), + } + } + } + fn try_recv_undrained_net( + &mut self, + logger: &mut dyn Logger, + ) -> Result, TryRecvAnyNetError> { + while let Some(index) = self.net_endpoint_store.polled_undrained.pop() { + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + if let Some(msg) = net_endpoint + .try_recv(logger) + .map_err(|error| TryRecvAnyNetError { error, index })? + { + endptlog!(logger, "RECV polled_undrained {:?}", &msg); + if !net_endpoint.inbox.is_empty() { + // there may be another message waiting! + self.net_endpoint_store.polled_undrained.insert(index); + } + return Ok(Some((index, msg))); } - self.events.clear(); } + Ok(None) + } + fn poll_and_polulate( + &mut self, + logger: &mut dyn Logger, + deadline: &Option, + ) -> Result<(), PollAndPopulateError> { + use PollAndPopulateError as Pape; + // No message yet. Do we have enough time to poll? + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?) + } else { + None + }; + // Yes we do! Poll with remaining time as poll deadline + self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; + for event in self.events.iter() { + match TokenTarget::from(event.token()) { + TokenTarget::Waker => { + // Can ignore. Residual event from endpoint manager setup procedure + } + TokenTarget::NetEndpoint { index } => { + self.net_endpoint_store.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}", + &event, + index, + self.net_endpoint_store.polled_undrained.iter() + ); + } + TokenTarget::UdpEndpoint { index } => { + self.udp_endpoint_store.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}", + &event, + index, + self.udp_endpoint_store.polled_undrained.iter() + ); + } + } + } + self.events.clear(); + Ok(()) } pub(super) fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { @@ -175,7 +339,79 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } + pub(super) fn udp_endpoints_round_start( + &mut self, + logger: &mut dyn Logger, + spec_var_stream: &mut SpecVarStream, + ) { + log!( + logger, + "Starting round for {} udp endpoints", + self.udp_endpoint_store.endpoint_exts.len() + ); + for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { + let spec_var = spec_var_stream.next(); + log!(logger, "Udp endpoint given {} spec var {:?} for this round", index, spec_var); + ee.incoming_round_spec_var = Some(spec_var); + } + } + pub(super) fn udp_endpoints_round_end( + &mut self, + logger: &mut dyn Logger, + decision: &Decision, + ) -> Result<(), UnrecoverableSyncError> { + log!( + logger, + "Ending round for {} udp endpoints", + self.udp_endpoint_store.endpoint_exts.len() + ); + use UnrecoverableSyncError as Use; + if let Decision::Success(solution_predicate) = decision { + 'endpoint_loop: for (index, ee) in + self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() + { + ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity. + for (payload_predicate, payload) in ee.outgoing_payloads.drain() { + if payload_predicate.assigns_subset(solution_predicate) { + ee.sock + .send(payload.as_slice()) + .map_err(|_| Use::BrokenNetEndpoint { index })?; + log!( + logger, + "Sent payload {:?} with pred {:?} through Udp endpoint {}", + &payload, + &payload_predicate, + index + ); + continue 'endpoint_loop; // send at most one payload per endpoint per round + } + } + log!(logger, "Sent no message through Udp endpoint {}", index); + } + } + Ok(()) + } } +// impl UdpEndpointExt { +// fn try_recv( +// &mut self, +// port_info: &PortInfo, +// udp_in_buffer: &mut UdpInBuffer, +// ) -> Option { +// let recv_buffer = udp_in_buffer.as_mut_slice(); +// let len = self.sock.recv(recv_buffer).ok()?; +// let payload = Payload::from(&recv_buffer[..len]); +// let branch_spec_var = self +// .incoming_round_spec_var +// .expect("Udp spec var should be Some(..) if recv() is called"); +// let branch_spec_val = SpecVal::nth_domain_element(self.incoming_payloads.len()); +// self.incoming_payloads.push(payload.clone()); +// let predicate = Predicate::default() +// .inserted(branch_spec_var, branch_spec_val) +// .inserted(port_info.spec_var_for(self.getter_for_incoming), SpecVal::FIRING); +// Some(SendPayloadMsg { payload, predicate }) +// } +// } impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() @@ -203,3 +439,18 @@ impl Into for SetupMsg { Msg::SetupMsg(self) } } +impl From for ConnectError { + fn from(pape: PollAndPopulateError) -> ConnectError { + use {ConnectError as Ce, PollAndPopulateError as Pape}; + match pape { + Pape::PollFailed => Ce::PollFailed, + Pape::Timeout => Ce::Timeout, + } + } +} +impl From for UnrecoverableSyncError { + fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError { + let TryRecvAnyNetError { index, .. } = trane; + UnrecoverableSyncError::BrokenNetEndpoint { index } + } +}