diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index f7f4086529f90fd9028e68eafe917354778820ef..da9883dfcec23dab896a5988c0cb850ce770d27f 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -31,6 +31,7 @@ pub(crate) struct PolyP { } #[derive(Debug, Clone)] pub(crate) struct BranchP { + pub outbox: HashMap, pub inbox: HashMap, pub state: ProtocolS, } @@ -114,26 +115,31 @@ impl PolyP { ); // OK now check we really received all the messages we expected to - if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() { - lockprintln!( - "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", + let num_fired = predicate.iter_matching(true).count(); + let num_msgs = + branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count(); + match num_fired.cmp(&num_msgs) { + Ordering::Less => unreachable!(), + Ordering::Greater => lockprintln!( + "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .", cid, m_ctx.my_subtree_id, &predicate, - ); - m_ctx.solution_storage.submit_and_digest_subtree_solution( - m_ctx.my_subtree_id, - predicate.clone(), - ); - // store the solution for recovering later - self.complete.insert(predicate, branch); - } else { - lockprintln!( - "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.", - cid, - m_ctx.my_subtree_id, - &predicate, - ); + ), + Ordering::Equal => { + lockprintln!( + "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", + cid, + m_ctx.my_subtree_id, + &predicate, + ); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + m_ctx.my_subtree_id, + predicate.clone(), + ); + // store the solution for recovering later + self.complete.insert(predicate, branch); + } } } Sb::PutMsg(ekey, payload) => { @@ -141,6 +147,7 @@ impl PolyP { let EndpointExt { info, endpoint } = m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap(); if predicate.replace_assignment(info.channel_id, true) != Some(false) { + branch.outbox.insert(ekey, payload.clone()); let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload, @@ -276,12 +283,12 @@ impl PolyP { pub(crate) fn become_mono( mut self, decision: &Predicate, - all_inboxes: &mut HashMap, + table_row: &mut HashMap, ) -> MonoP { if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) { - let BranchP { inbox, state } = branch; - for (key, payload) in inbox { - assert!(all_inboxes.insert(key, payload).is_none()); + let BranchP { inbox, state, outbox } = branch; + for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) { + table_row.insert(key, payload); } self.incomplete.clear(); MonoP { state, ekeys: self.ekeys } @@ -312,12 +319,12 @@ impl PolyN { pub fn become_mono( mut self, decision: &Predicate, - all_inboxes: &mut HashMap, + table_row: &mut HashMap, ) -> MonoN { if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) { let BranchN { gotten, sync_batch_index, .. } = branch; for (&key, payload) in gotten.iter() { - assert!(all_inboxes.insert(key, payload.clone()).is_none()); + assert!(table_row.insert(key, payload.clone()).is_none()); } MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) } } else {