diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 801b102b746ad94f712dfb044fdd569bdeece28d..dcbff96cf8c2fe45ed3090bd16d785ea96886f89 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -61,6 +61,14 @@ impl ReplaceBoolTrue for bool { } //////////////// +impl RoundCtxTrait for RoundCtx { + fn get_deadline(&self) -> &Option { + &self.deadline + } + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.getter_buffer.getter_add(getter, msg) + } +} impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; @@ -301,6 +309,8 @@ impl Connector { } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); + + comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, @@ -310,11 +320,14 @@ impl Connector { &mut rctx, )?; log!(cu.logger, "Committing to decision {:?}!", &decision); + comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Announce { decision: decision.clone() }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce { + decision: decision.clone(), + }), }); log!( cu.logger, @@ -504,7 +517,9 @@ impl Connector { let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Suggest { suggestion }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { + suggestion, + }), }); comm.endpoint_manager.send_to_comms(parent, &msg)?; } @@ -519,73 +534,38 @@ impl Connector { // try recv messages arriving through endpoints log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); { - let (endpoint_index, msg) = loop { - match comm - .endpoint_manager - .try_recv_any_comms(&mut *cu.logger, rctx.deadline)? - { - None => { - log!(cu.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger, "Already requested failure"); - } + let (endpoint_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm + .endpoint_manager + .try_recv_any_comms(&mut *cu.logger, &cu.port_info, rctx, comm.round_index)? + { + CommRecvOk::NewControlMsg { net_endpoint_index, msg } => { + (net_endpoint_index, msg) + } + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + log!(cu.logger, "Already requested failure"); } - rctx.deadline = None; + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - Some((endpoint_index, msg)) => break (endpoint_index, msg), - } - }; - log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); - let comm_msg_contents = match msg { - Msg::SetupMsg(..) => { - log!(cu.logger, "Discarding setup message; that phase is over"); + rctx.deadline = None; continue 'undecided; } - Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) { - Ordering::Equal => comm_msg.contents, - Ordering::Less => { - log!( - cu.logger, - "We are in round {}, but msg is for round {}. Discard", - comm_msg.round_index, - comm.round_index, - ); - drop(comm_msg); - continue 'undecided; - } - Ordering::Greater => { - log!( - cu.logger, - "We are in round {}, but msg is for round {}. Buffer", - comm_msg.round_index, - comm.round_index, - ); - comm.endpoint_manager - .delayed_messages - .push((endpoint_index, Msg::CommMsg(comm_msg))); - continue 'undecided; - } - }, }; - match comm_msg_contents { - CommMsgContents::SendPayload(send_payload_msg) => { - let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index] - .getter_for_incoming; - assert!(cu.port_info.polarities.get(&getter) == Some(&Getter)); - log!( - cu.logger, - "Msg routed to getter port {:?}. Buffer for recv loop", - getter, - ); - rctx.getter_buffer.getter_add(getter, send_payload_msg); - } - CommMsgContents::Suggest { suggestion } => { + log!( + cu.logger, + "Received from endpoint {} ctrl msg {:?}", + endpoint_index, + &comm_ctrl_msg + ); + match comm_ctrl_msg { + CommCtrlMsg::Suggest { suggestion } => { // only accept this control msg through a child endpoint if comm.neighborhood.children.contains(&endpoint_index) { match suggestion { @@ -626,7 +606,7 @@ impl Connector { ); } } - CommMsgContents::Announce { decision } => { + CommCtrlMsg::Announce { decision } => { if Some(endpoint_index) == comm.neighborhood.parent { // adopt this decision return Ok(decision); @@ -653,7 +633,7 @@ impl Connector { let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Suggest { suggestion }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }), }); comm.endpoint_manager.send_to_comms(parent, &msg) } @@ -758,7 +738,7 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) { + if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; @@ -937,7 +917,7 @@ impl BranchingProtoComponent { fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { - if branch.ended && solution_predicate.consistent_with(&branch_predicate) { + if branch.ended && branch_predicate.assigns_subset(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; }