diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e11cc827faf76186312b82a4e31457fe929bbd03..d7cd77cba7c3d4b5180bcc38bf376c961add6e3f 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -26,6 +26,7 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { + ended: bool, inbox: HashMap, state: ComponentState, } @@ -319,7 +320,7 @@ impl Connector { cu.proto_components.keys() ); // consume native - Ok(Some(branching_native.collapse_with(&predicate))) + Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate))) } }; log!(cu.logger, "Sync round ending! Cleaning up"); @@ -364,6 +365,7 @@ impl Connector { for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { let BranchingProtoComponent { ports, branches } = proto_component; let mut swap = HashMap::default(); + // initially, no components have .ended==true let mut blocked = HashMap::default(); // drain from branches --> blocked let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); @@ -651,6 +653,12 @@ impl BranchingNative { assert!(was.is_none()); branch.to_get.remove(&getter); if branch.to_get.is_empty() { + log!( + cu.logger, + "new native solution with {:?} (to_get.is_empty()) with gotten {:?}", + &predicate, + &branch.gotten + ); let route = Route::LocalComponent(ComponentId::Native); solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, @@ -715,10 +723,17 @@ impl BranchingNative { } } } - fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk { + fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { + log!( + logger, + "Collapsing native with {} branch preds {:?}", + self.branches.len(), + self.branches.keys() + ); for (branch_predicate, branch) in self.branches { - if solution_predicate.satisfies(&branch_predicate) { + if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; + log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; } } @@ -771,6 +786,7 @@ impl BranchingProtoComponent { Route::LocalComponent(ComponentId::Proto(proto_component_id)), predicate.clone(), ); + branch.ended = true; // move to "blocked" drainer.add_output(predicate, branch); } @@ -832,6 +848,11 @@ impl BranchingProtoComponent { // partition drain from branches -> {unblocked, blocked} log!(logger, "visiting {} blocked branches...", branches.len()); for (predicate, mut branch) in branches.drain() { + if branch.ended { + log!(logger, "Skipping ended branch with {:?}", &predicate); + blocked.insert(predicate, branch); + continue; + } use CommonSatResult as Csr; log!(logger, "visiting branch with pred {:?}", &predicate); match predicate.common_satisfier(&send_payload_msg.predicate) { @@ -885,7 +906,7 @@ impl BranchingProtoComponent { fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { - if branch_predicate.satisfies(solution_predicate) { + if branch.ended && branch_predicate.satisfies(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; } @@ -893,7 +914,7 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { - let branch = ProtoComponentBranch { inbox: Default::default(), state }; + let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } }