diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 31586d5a65d54f78dc6017ee21bddfd53f0969d1..faa4436a0ab12df295bcdc9d94f7b99aaeba378e 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -36,6 +36,7 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { + did_put: HashSet, inbox: HashMap, state: ComponentState, untaken_choice: Option, @@ -185,6 +186,7 @@ impl Connector { ); // 1. run all proto components to Nonsync blockers + // NOTE: original components are immutable until Decision::Success let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = @@ -708,7 +710,7 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -720,13 +722,13 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -739,8 +741,8 @@ impl BranchingNative { &predicate2, &predicate ); - Self::fold_into(finished, predicate, branch); - Self::fold_into(finished, predicate2, branch2); + Self::insert_branch_merging(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched @@ -751,13 +753,13 @@ impl BranchingNative { "new subsuming pred created {:?}. forking and feeding", &predicate2 ); - Self::fold_into(finished, predicate, branch); - Self::fold_into(finished, predicate2, branch2); + Self::insert_branch_merging(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate2, branch2); } } } } - fn fold_into( + fn insert_branch_merging( branches: &mut HashMap, predicate: Predicate, mut branch: NativeBranch, @@ -766,18 +768,22 @@ impl BranchingNative { use std::collections::hash_map::Entry; match e { Entry::Vacant(ev) => { + // no existing branch present. We insert it no problem. (The most common case) ev.insert(branch); } Entry::Occupied(mut eo) => { - let b = eo.get_mut(); + // Oh dear, there is already a branch with this predicate. + // Rather than choosing either branch, we MERGE them. + // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get + let old = eo.get_mut(); for (k, v) in branch.gotten.drain() { - if b.gotten.insert(k, v).is_none() { - b.to_get.remove(&k); + if old.gotten.insert(k, v).is_none() { + // added a gotten element in `branch` not already in `old` + old.to_get.remove(&k); } } } } - // if let Some(prev) = branches.insert(predicate, branch) } fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { log!( @@ -811,7 +817,7 @@ impl BranchingProtoComponent { proto_component_id: ProtoComponentId, ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { - cd.cylic_drain(|mut predicate, mut branch, mut drainer| { + cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { untaken_choice: &mut branch.untaken_choice, logger: &mut *cu.logger, @@ -839,14 +845,25 @@ impl BranchingProtoComponent { } } B::Inconsistent => { - // branch is inconsistent. throw it away + // EXPLICIT inconsistency drop((predicate, branch)); } B::SyncBlockEnd => { // make concrete all variables - for &port in ports.iter() { - let var = cu.port_info.spec_var_for(port); - predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + for port in ports.iter() { + let var = cu.port_info.spec_var_for(*port); + let should_have_fired = match cu.port_info.polarities.get(port).unwrap() { + Polarity::Getter => branch.inbox.contains_key(port), + Polarity::Putter => branch.did_put.contains(port), + }; + let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + let did_fire = val == SpecVal::FIRING; + if did_fire != should_have_fired { + log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + // IMPLICIT inconsistency + drop((predicate, branch)); + return Ok(()); + } } // submit solution for this component let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); @@ -884,6 +901,7 @@ impl BranchingProtoComponent { drop((predicate, branch)); } else { // keep in "unblocked" + branch.did_put.insert(putter); log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); @@ -894,6 +912,16 @@ impl BranchingProtoComponent { Ok(()) }) } + fn branch_merge_func( + mut a: ProtoComponentBranch, + b: &mut ProtoComponentBranch, + ) -> ProtoComponentBranch { + if b.ended && !a.ended { + a.ended = true; + std::mem::swap(&mut a, b); + } + a + } fn feed_msg( &mut self, cu: &mut ConnectorUnphased, @@ -983,6 +1011,7 @@ impl BranchingProtoComponent { fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { inbox: Default::default(), + did_put: Default::default(), state, ended: false, untaken_choice: None, @@ -1123,6 +1152,19 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } + fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { + use std::collections::hash_map::Entry; + let e = self.swap.entry(k); + match e { + Entry::Vacant(ev) => { + ev.insert(v); + } + Entry::Occupied(mut eo) => { + let old = eo.get_mut(); + *old = func(v, old); + } + } + } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1185,7 +1227,7 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { ) -> Self { Self { input, inner: CyclicDrainInner { swap, output } } } - fn cylic_drain( + fn cyclic_drain( self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> {