diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index da44b68e899e8ced9bd4c9888d6bb8cb2411a12a..779169ca54a1a4786de5439773d8066b78457ba1 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -156,6 +156,8 @@ impl Connector { comm.round_index ); + let mut spec_var_stream = cu.id_manager.new_spec_var_stream(); + // 1. run all proto components to Nonsync blockers let mut branching_proto_components = HashMap::::default(); @@ -224,23 +226,27 @@ impl Connector { "Translating {} native batches into branches...", comm.native_batches.len() ); + let native_branch_spec_var = spec_var_stream.next(); + log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; - 'native_branches: for (index, NativeBatch { to_get, to_put }) in - comm.native_batches.drain(..).enumerate() + 'native_branches: for ((native_branch, index), branch_spec_val) in + comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) { + let NativeBatch { to_get, to_put } = native_branch; let predicate = { - let mut predicate = Predicate::default(); + let mut predicate = + Predicate::default().inserted(native_branch_spec_var, branch_spec_val); // assign trues for ports that fire let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.port_info.firing_var_for(port); - predicate.assigned.insert(var, true); + let var = cu.port_info.spec_var_for(port); + predicate.assigned.insert(var, SpecVal::FIRING); } // assign falses for silent ports for &port in cu.native_ports.difference(&firing_ports) { - let var = cu.port_info.firing_var_for(port); - if let Some(true) = predicate.assigned.insert(var, false) { + let var = cu.port_info.spec_var_for(port); + if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } @@ -269,8 +275,9 @@ impl Connector { ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(existing) = branching_native.branches.insert(predicate, branch) { - return Err(Se::IndistinguishableBatches([index, existing.index])); + if let Some(_existing) = branching_native.branches.insert(predicate, branch) { + unreachable!() + // return Err(Se::IndistinguishableBatches([index, existing.index])); } } // restore the invariant @@ -645,7 +652,7 @@ impl BranchingNative { for (predicate, mut branch) in draining.drain() { log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.port_info.firing_var_for(getter); + let var = cu.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); @@ -665,7 +672,7 @@ impl BranchingNative { ); } }; - if predicate.query(var) != Some(true) { + if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( cu.logger, @@ -675,9 +682,9 @@ impl BranchingNative { finished.insert(predicate, branch); continue; } - use CommonSatResult as Csr; - match predicate.common_satisfier(&send_payload_msg.predicate) { - Csr::Nonexistant => { + use AllMapperResult as Amr; + match predicate.all_mapper(&send_payload_msg.predicate) { + Amr::Nonexistant => { // this branch does not receive the message log!( cu.logger, @@ -686,13 +693,13 @@ impl BranchingNative { ); finished.insert(predicate, branch); } - Csr::Equivalent | Csr::FormerNotLatter => { + Amr::Equivalent | Amr::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); finished.insert(predicate, branch); } - Csr::LatterNotFormer => { + Amr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); @@ -706,7 +713,7 @@ impl BranchingNative { finished.insert(predicate, branch); finished.insert(predicate2, branch2); } - Csr::New(predicate2) => { + Amr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); @@ -729,7 +736,7 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) { + if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; @@ -775,8 +782,8 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for &port in ports.iter() { - let var = cu.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); + let var = cu.port_info.spec_var_for(port); + predicate.assigned.entry(var).or_insert(SpecVal::SILENT); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( @@ -795,19 +802,19 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.port_info.firing_var_for(port); + let var = cu.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" - drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); - drainer.add_input(predicate.inserted(var, true), branch); + drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); + drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch); } B::PutMsg(putter, payload) => { // sanity check assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); // overwrite assignment - let var = cu.port_info.firing_var_for(putter); - let was = predicate.assigned.insert(var, true); - if was == Some(false) { + let var = cu.port_info.spec_var_for(putter); + let was = predicate.assigned.insert(var, SpecVal::FIRING); + if was == Some(SpecVal::SILENT) { log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); @@ -851,21 +858,21 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); continue; } - use CommonSatResult as Csr; + use AllMapperResult as Amr; log!(logger, "visiting branch with pred {:?}", &predicate); - match predicate.common_satisfier(&send_payload_msg.predicate) { - Csr::Nonexistant => { + match predicate.all_mapper(&send_payload_msg.predicate) { + Amr::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); blocked.insert(predicate, branch); } - Csr::Equivalent | Csr::FormerNotLatter => { + Amr::Equivalent | Amr::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } - Csr::LatterNotFormer => { + Amr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); @@ -874,7 +881,7 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); unblocked.insert(predicate2, branch2); } - Csr::New(predicate2) => { + Amr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); @@ -904,7 +911,7 @@ impl BranchingProtoComponent { fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { - if branch.ended && branch_predicate.satisfies(solution_predicate) { + if branch.ended && solution_predicate.consistent_with(&branch_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; } @@ -1034,8 +1041,8 @@ impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.firing_var_for(port); - self.predicate.query(var) + let var = self.port_info.spec_var_for(port); + self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port)