diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index f7f4086529f90fd9028e68eafe917354778820ef..da9883dfcec23dab896a5988c0cb850ce770d27f 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -31,6 +31,7 @@ pub(crate) struct PolyP { } #[derive(Debug, Clone)] pub(crate) struct BranchP { + pub outbox: HashMap, pub inbox: HashMap, pub state: ProtocolS, } @@ -114,26 +115,31 @@ impl PolyP { ); // OK now check we really received all the messages we expected to - if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() { - lockprintln!( - "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", + let num_fired = predicate.iter_matching(true).count(); + let num_msgs = + branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count(); + match num_fired.cmp(&num_msgs) { + Ordering::Less => unreachable!(), + Ordering::Greater => lockprintln!( + "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .", cid, m_ctx.my_subtree_id, &predicate, - ); - m_ctx.solution_storage.submit_and_digest_subtree_solution( - m_ctx.my_subtree_id, - predicate.clone(), - ); - // store the solution for recovering later - self.complete.insert(predicate, branch); - } else { - lockprintln!( - "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.", - cid, - m_ctx.my_subtree_id, - &predicate, - ); + ), + Ordering::Equal => { + lockprintln!( + "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", + cid, + m_ctx.my_subtree_id, + &predicate, + ); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + m_ctx.my_subtree_id, + predicate.clone(), + ); + // store the solution for recovering later + self.complete.insert(predicate, branch); + } } } Sb::PutMsg(ekey, payload) => { @@ -141,6 +147,7 @@ impl PolyP { let EndpointExt { info, endpoint } = m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap(); if predicate.replace_assignment(info.channel_id, true) != Some(false) { + branch.outbox.insert(ekey, payload.clone()); let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload, @@ -276,12 +283,12 @@ impl PolyP { pub(crate) fn become_mono( mut self, decision: &Predicate, - all_inboxes: &mut HashMap, + table_row: &mut HashMap, ) -> MonoP { if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) { - let BranchP { inbox, state } = branch; - for (key, payload) in inbox { - assert!(all_inboxes.insert(key, payload).is_none()); + let BranchP { inbox, state, outbox } = branch; + for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) { + table_row.insert(key, payload); } self.incomplete.clear(); MonoP { state, ekeys: self.ekeys } @@ -312,12 +319,12 @@ impl PolyN { pub fn become_mono( mut self, decision: &Predicate, - all_inboxes: &mut HashMap, + table_row: &mut HashMap, ) -> MonoN { if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) { let BranchN { gotten, sync_batch_index, .. } = branch; for (&key, payload) in gotten.iter() { - assert!(all_inboxes.insert(key, payload.clone()).is_none()); + assert!(table_row.insert(key, payload.clone()).is_none()); } MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) } } else { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 17be1d7197410b92734cf79b0c99fccc786dfc75..5bfd49a94c9133183f4f18545931172ec193777a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -6,30 +6,21 @@ impl Controller { let cid = self.inner.channel_id_stream.controller_id; lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision); - let mut all_inboxes = HashMap::default(); + let mut table_row = HashMap::default(); self.inner.mono_n = self .ephemeral .poly_n .take() - .map(|poly_n| poly_n.become_mono(&decision, &mut all_inboxes)); + .map(|poly_n| poly_n.become_mono(&decision, &mut table_row)); self.inner.mono_ps.extend( - self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut all_inboxes)), + self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)), ); - let valuations: HashMap<_, _> = all_inboxes - .drain() - .map(|(ekey, payload)| { - let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - (channel_id, Some(payload)) - }) - .collect(); - for (channel_id, value) in decision.assigned.iter() { - if !value { - lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id); - } else if let Some(payload) = valuations.get(channel_id) { - lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload); - } else { - lockprintln!("{:?}: VALUE {:?} => Message(?)", cid, channel_id); - } + for (ekey, payload) in table_row { + let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload); + } + for channel_id in decision.iter_matching(false) { + lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id); } let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); @@ -446,6 +437,7 @@ impl Into for MonoP { BranchP { state: self.state, inbox: Default::default(), + outbox: Default::default(), } }, ekeys: self.ekeys, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ff8ccbaaaf177724edd69095a15e74cd9a240cc2..05cc55a8a82015c43ca9384734e9a1b0a54be8ff 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -406,11 +406,10 @@ impl Predicate { } } - pub fn iter_true(&self) -> impl Iterator + '_ { - self.assigned.iter().filter_map(|(&channel_id, b)| match b { - true => Some(channel_id), - false => None, - }) + pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { + self.assigned + .iter() + .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) } pub fn batch_assign_nones(