diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index ef47ff4bc174db53c47c3e8adf49418dd8174aac..31586d5a65d54f78dc6017ee21bddfd53f0969d1 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -19,7 +19,7 @@ struct BranchingNative { struct NativeBranch { index: usize, gotten: HashMap, - to_get: HashSet, // native branch is ended iff to_get.is_empty() + to_get: HashSet, } #[derive(Debug)] struct SolutionStorage { @@ -248,6 +248,11 @@ impl Connector { .iter() .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); + log!( + cu.logger, + "Children in subtree are: {:?}", + subtree_id_iter.clone().collect::>() + ); SolutionStorage::new(subtree_id_iter) }, spec_var_stream: cu.id_manager.new_spec_var_stream(), @@ -296,8 +301,8 @@ impl Connector { log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } - if to_get.is_empty() { - // this branch is immediately ready to be part of a solution + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if branch.is_ended() { log!( cu.logger, "Native submitting solution for batch {} with {:?}", @@ -310,7 +315,6 @@ impl Connector { predicate.clone(), ); } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; if let Some(_) = branching_native.branches.insert(predicate, branch) { // thanks to the native_branch_spec_var, each batch has a distinct predicate unreachable!() @@ -649,6 +653,11 @@ impl Connector { comm.endpoint_manager.send_to_comms(parent, &msg) } } +impl NativeBranch { + fn is_ended(&self) -> bool { + self.to_get.is_empty() + } +} impl BranchingNative { fn feed_msg( &mut self, @@ -667,13 +676,13 @@ impl BranchingNative { // check if this branch expects to receive it let var = cu.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { + branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); - branch.to_get.remove(&getter); - if branch.to_get.is_empty() { + if branch.is_ended() { log!( cu.logger, - "new native solution with {:?} (to_get.is_empty()) with gotten {:?}", + "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); @@ -683,6 +692,13 @@ impl BranchingNative { subtree_id, predicate.clone(), ); + } else { + log!( + cu.logger, + "Fed native {:?} still has to_get {:?}", + &predicate, + &branch.to_get + ); } }; if predicate.query(var) != Some(SpecVal::FIRING) { @@ -692,7 +708,7 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -704,13 +720,13 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -723,8 +739,8 @@ impl BranchingNative { &predicate2, &predicate ); - finished.insert(predicate, branch); - finished.insert(predicate2, branch2); + Self::fold_into(finished, predicate, branch); + Self::fold_into(finished, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched @@ -735,11 +751,33 @@ impl BranchingNative { "new subsuming pred created {:?}. forking and feeding", &predicate2 ); - finished.insert(predicate, branch); - finished.insert(predicate2, branch2); + Self::fold_into(finished, predicate, branch); + Self::fold_into(finished, predicate2, branch2); + } + } + } + } + fn fold_into( + branches: &mut HashMap, + predicate: Predicate, + mut branch: NativeBranch, + ) { + let e = branches.entry(predicate); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + ev.insert(branch); + } + Entry::Occupied(mut eo) => { + let b = eo.get_mut(); + for (k, v) in branch.gotten.drain() { + if b.gotten.insert(k, v).is_none() { + b.to_get.remove(&k); + } } } } + // if let Some(prev) = branches.insert(predicate, branch) } fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { log!( @@ -749,7 +787,14 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) { + log!( + logger, + "Considering native branch {:?} with to_get {:?} gotten {:?}", + &branch_predicate, + &branch.to_get, + &branch.gotten + ); + if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten };