diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index 81bd02c3c84d296c3230280c5a87ed5444909d60..2302702f7bdb5602ab30406b9d2845b8ed1e06d2 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -55,9 +55,8 @@ impl PolyP { mut to_run: Vec<(Predicate, BranchP)>, ) -> Result { use SyncRunResult as Srr; - let cid = m_ctx.inner.channel_id_stream.controller_id; log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,); - while let Some((mut predicate, mut branch)) = to_run.pop() { + 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { let mut r_ctx = BranchPContext { m_ctx: m_ctx.reborrow(), ekeys: &self.ekeys, @@ -111,46 +110,45 @@ impl PolyP { to_run.push((predicate_f, branch_f)); } Sb::SyncBlockEnd => { + let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner; log!( - &mut m_ctx.inner.logger, - "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", + logger, + "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...", m_ctx.my_subtree_id, &predicate, - &blocker ); // come up with the predicate for this local solution - let lookup = - |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - let ekeys_channel_id_iter = self.ekeys.iter().map(lookup); - predicate.batch_assign_nones(ekeys_channel_id_iter, false); - // OK now check we really received all the messages we expected to - let num_fired = predicate.iter_matching(true).count(); - let num_msgs = - branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count(); - match num_fired.cmp(&num_msgs) { - Ordering::Less => unreachable!(), - Ordering::Greater => log!( - &mut m_ctx.inner.logger, - "{:?} with pred {:?} finished but |inbox|+|outbox| < .", - m_ctx.my_subtree_id, - &predicate, - ), - Ordering::Equal => { - log!( - &mut m_ctx.inner.logger, - "{:?} with pred {:?} finished! Storing this solution locally.", - m_ctx.my_subtree_id, - &predicate, - ); - m_ctx.solution_storage.submit_and_digest_subtree_solution( - m_ctx.my_subtree_id, - predicate.clone(), - ); - // store the solution for recovering later - self.complete.insert(predicate, branch); + for ekey in self.ekeys.iter() { + let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id; + let fired = + branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey); + match predicate.query(channel_id) { + Some(true) => { + if !fired { + // This branch should have fired but didn't! + log!( + logger, + "~ ... ... should have fired {:?} and didn't! pruning!", + channel_id, + ); + continue 'to_run_loop; + } + } + Some(false) => assert!(!fired), + None => { + predicate.replace_assignment(channel_id, false); + assert!(!fired) + } } } + log!(logger, "~ ... ... and finished just fine!",); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + &mut m_ctx.inner.logger, + m_ctx.my_subtree_id, + predicate.clone(), + ); + self.complete.insert(predicate, branch); } Sb::PutMsg(ekey, payload) => { assert!(self.ekeys.contains(&ekey)); @@ -225,9 +223,8 @@ impl PolyP { match old_predicate.common_satisfier(&payload_predicate) { Csr::FormerNotLatter | Csr::Equivalent => { log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", - &old_predicate ); // old_predicate COVERS the assumptions of payload_predicate @@ -236,11 +233,9 @@ impl PolyP { Some((old_predicate, branch)) } Csr::New(new) => { - log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", - &payload_predicate, &old_predicate, &new, @@ -255,11 +250,9 @@ impl PolyP { Some((new, payload_branch)) } Csr::LatterNotFormer => { - log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", - &old_predicate, &payload_predicate, ); @@ -274,9 +267,8 @@ impl PolyP { } Csr::Nonexistant => { log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", - &old_predicate, &payload_predicate, ); @@ -320,18 +312,71 @@ impl PolyN { pub fn sync_recv( &mut self, ekey: Key, + logger: &mut String, payload: Payload, + payload_predicate: Predicate, solution_storage: &mut SolutionStorage, ) { - for (predicate, branch) in self.branches.iter_mut() { - if branch.to_get.remove(&ekey) { - branch.gotten.insert(ekey, payload.clone()); - if branch.to_get.is_empty() { - solution_storage - .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone()); + let mut branches2: HashMap<_, _> = Default::default(); + for (old_predicate, mut branch) in self.branches.drain() { + use CommonSatResult as Csr; + let case = old_predicate.common_satisfier(&payload_predicate); + let mut report_if_solution = + |branch: &BranchN, pred: &Predicate, logger: &mut String| { + if branch.to_get.is_empty() { + solution_storage.submit_and_digest_subtree_solution( + logger, + SubtreeId::PolyN, + pred.clone(), + ); + } + }; + log!( + logger, + "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}", + &payload_predicate, + &payload, + &old_predicate, + &case + ); + match case { + Csr::Nonexistant => { /* skip branch */ } + Csr::FormerNotLatter | Csr::Equivalent => { + // Feed the message to this branch in-place. no need to modify pred. + if branch.to_get.remove(&ekey) { + branch.gotten.insert(ekey, payload.clone()); + report_if_solution(&branch, &old_predicate, logger); + } + } + Csr::LatterNotFormer => { + // create a new branch with the payload_predicate. + let mut forked = branch.clone(); + if forked.to_get.remove(&ekey) { + forked.gotten.insert(ekey, payload.clone()); + report_if_solution(&forked, &payload_predicate, logger); + branches2.insert(payload_predicate.clone(), forked); + } + } + Csr::New(new) => { + // create a new branch with the newly-created predicate + let mut forked = branch.clone(); + if forked.to_get.remove(&ekey) { + forked.gotten.insert(ekey, payload.clone()); + report_if_solution(&forked, &new, logger); + branches2.insert(new.clone(), forked); + } } } + // unlike PolyP machines, Native branches do not become inconsistent + branches2.insert(old_predicate, branch); } + log!( + logger, + "Native now has {} branches with predicates: {:?}", + branches2.len(), + branches2.keys().collect::>() + ); + std::mem::swap(&mut branches2, &mut self.branches); } pub fn become_mono(