diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 054123fb03ba031c5ad1b8541323b83139e89c29..62dbd2cc59e0a488b456401011ed0130ca21342a 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -146,22 +146,21 @@ impl EndpointManager { // drops all Setup messages, // buffers all future round messages, // drops all previous round messages, - // enqueues all current round SendPayload messages using round_ctx.getter_add + // enqueues all current round SendPayload messages using rctx.getter_push // returns the first comm_ctrl_msg encountered // only polls until SOME message is enqueued pub(super) fn try_recv_any_comms( &mut self, - logger: &mut dyn Logger, - current_state: &CurrentState, - round_ctx: &mut impl RoundCtxTrait, + cu: &mut impl CuUndecided, + rctx: &mut RoundCtx, round_index: usize, ) -> Result { /////////////////////////////////////////// impl EndpointManager { fn handle_msg( &mut self, - logger: &mut dyn Logger, - round_ctx: &mut impl RoundCtxTrait, + cu: &mut impl CuUndecided, + rctx: &mut RoundCtx, net_index: usize, msg: Msg, round_index: usize, @@ -173,7 +172,7 @@ impl EndpointManager { Ordering::Equal => comm_msg.contents, Ordering::Less => { log!( - logger, + cu.logger(), "We are in round {}, but msg is for round {}. Discard", comm_msg.round_index, round_index, @@ -182,7 +181,7 @@ impl EndpointManager { } Ordering::Greater => { log!( - logger, + cu.logger(), "We are in round {}, but msg is for round {}. Buffer", comm_msg.round_index, round_index, @@ -197,7 +196,7 @@ impl EndpointManager { CommMsgContents::SendPayload(send_payload_msg) => { let getter = self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; - round_ctx.getter_add(getter, send_payload_msg); + rctx.getter_push(getter, send_payload_msg); *some_message_enqueued = true; None } @@ -209,23 +208,18 @@ impl EndpointManager { let mut some_message_enqueued = false; // try yield undelayed net message while let Some((net_index, msg)) = self.undelayed_messages.pop() { - if let Some((net_index, msg)) = self.handle_msg( - logger, - round_ctx, - net_index, - msg, - round_index, - &mut some_message_enqueued, - ) { + if let Some((net_index, msg)) = + self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) + { return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } loop { // try receive a net message - while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? { + while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { if let Some((net_index, msg)) = self.handle_msg( - logger, - round_ctx, + cu, + rctx, net_index, msg, round_index, @@ -243,9 +237,9 @@ impl EndpointManager { self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = current_state.spec_var_for(ee.getter_for_incoming); + let port_spec_var = rctx.current_state.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); - round_ctx.getter_add( + rctx.getter_push( ee.getter_for_incoming, SendPayloadMsg { payload, predicate }, ); @@ -260,7 +254,7 @@ impl EndpointManager { return Ok(CommRecvOk::NewPayloadMsgs); } // poll if time remains - match self.poll_and_populate(logger, round_ctx.get_deadline()) { + match self.poll_and_populate(cu.logger(), &rctx.deadline) { Ok(()) => {} // continue looping Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), Err(Pape::PollFailed) => return Err(Use::PollFailed),