diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b7a263cde8f75c4dc4d45b8bd0d958de39988d13..8f334e9e53205752c55b7b5b795ae58f5343db4c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -330,7 +330,13 @@ impl Connector { // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); + log!( + cu.inner.logger, + "Native branch {} sending msg {:?} with putter {:?}", + index, + &msg, + putter + ); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -984,7 +990,7 @@ impl BranchingProtoComponent { for (predicate, mut branch) in branches.drain() { if branch.ended { log!(logger, "Skipping ended branch with {:?}", &predicate); - blocked.insert(predicate, branch); + Self::insert_branch_merging(&mut blocked, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -993,13 +999,13 @@ impl BranchingProtoComponent { Aur::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); - blocked.insert(predicate, branch); + Self::insert_branch_merging(&mut blocked, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); - unblocked.insert(predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -1007,16 +1013,16 @@ impl BranchingProtoComponent { let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); - blocked.insert(predicate, branch); - unblocked.insert(predicate2, branch2); + Self::insert_branch_merging(&mut blocked, predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); - blocked.insert(predicate, branch); - unblocked.insert(predicate2, branch2); + Self::insert_branch_merging(&mut blocked, predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } } } @@ -1036,6 +1042,29 @@ impl BranchingProtoComponent { log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); Ok(()) } + fn insert_branch_merging( + branches: &mut HashMap, + predicate: Predicate, + mut branch: ProtoComponentBranch, + ) { + let e = branches.entry(predicate); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + // no existing branch present. We insert it no problem. (The most common case) + ev.insert(branch); + } + Entry::Occupied(mut eo) => { + // Oh dear, there is already a branch with this predicate. + // Rather than choosing either branch, we MERGE them. + // This means keeping the existing one in-place, and giving it the UNION of the inboxes + let old = eo.get_mut(); + for (k, v) in branch.inner.inbox.drain() { + old.inner.inbox.insert(k, v); + } + } + } + } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches {