diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index 113915b302e6dab02f840e54c246bd302dc52625..f7f4086529f90fd9028e68eafe917354778820ef 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -100,17 +100,41 @@ impl PolyP { } Sb::SyncBlockEnd => { // come up with the predicate for this local solution - let ekeys_channel_id_iter = self - .ekeys - .iter() - .map(|&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id); + let lookup = + |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + let ekeys_channel_id_iter = self.ekeys.iter().map(lookup); predicate.batch_assign_nones(ekeys_channel_id_iter, false); - // report the local solution - m_ctx - .solution_storage - .submit_and_digest_subtree_solution(m_ctx.my_subtree_id, predicate.clone()); - // store the solution for recovering later - self.complete.insert(predicate, branch); + + lockprintln!( + "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", + cid, + m_ctx.my_subtree_id, + &predicate, + &blocker + ); + + // OK now check we really received all the messages we expected to + if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() { + lockprintln!( + "{:?}: {:?} with pred {:?} finished! Storing this solution locally.", + cid, + m_ctx.my_subtree_id, + &predicate, + ); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + m_ctx.my_subtree_id, + predicate.clone(), + ); + // store the solution for recovering later + self.complete.insert(predicate, branch); + } else { + lockprintln!( + "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.", + cid, + m_ctx.my_subtree_id, + &predicate, + ); + } } Sb::PutMsg(ekey, payload) => { assert!(self.ekeys.contains(&ekey)); @@ -198,8 +222,8 @@ impl PolyP { lockprintln!( "{:?}: ... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", cid, - &old_predicate, &payload_predicate, + &old_predicate, &new, ); // payload_predicate has new assumptions. FORK! diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index c104fce7d64c39e494384136d76fda59f9862586..17be1d7197410b92734cf79b0c99fccc786dfc75 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -3,6 +3,9 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { + let cid = self.inner.channel_id_stream.controller_id; + lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision); + let mut all_inboxes = HashMap::default(); self.inner.mono_n = self .ephemeral @@ -21,16 +24,22 @@ impl Controller { .collect(); for (channel_id, value) in decision.assigned.iter() { if !value { - println!("VALUE {:?} => *", channel_id); + lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id); } else if let Some(payload) = valuations.get(channel_id) { - println!("VALUE {:?} => Message({:?})", channel_id, payload); + lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload); } else { - println!("VALUE {:?} => Message(?)", channel_id); + lockprintln!("{:?}: VALUE {:?} => Message(?)", cid, channel_id); } } let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { + lockprintln!( + "{:?}: Forwarding {:?} to child with ekey {:?}", + cid, + &announcement, + child_ekey + ); self.inner .endpoint_exts .get_mut(child_ekey) @@ -45,6 +54,7 @@ impl Controller { // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found fn handle_locals_maybe_decide(&mut self) -> Result { + let cid = self.inner.channel_id_stream.controller_id; if let Some(parent_ekey) = self.inner.family.parent_ekey { // I have a parent -> I'm not the leader let parent_endpoint = @@ -52,6 +62,7 @@ impl Controller { for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); + lockprintln!("{:?}: Sending {:?} to parent {:?}", cid, &msg, parent_ekey); parent_endpoint.send(msg)?; } Ok(false) @@ -60,6 +71,7 @@ impl Controller { assert!(self.inner.family.parent_ekey.is_none()); let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next(); Ok(if let Some(decision) = maybe_decision { + lockprintln!("{:?}: DECIDE ON {:?} AS LEADER!", cid, &decision); self.end_round_with_decision(decision)?; true } else { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 50d90e104bbd3c55fa4892c62290a386147f65fa..ff8ccbaaaf177724edd69095a15e74cd9a240cc2 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -406,6 +406,13 @@ impl Predicate { } } + pub fn iter_true(&self) -> impl Iterator + '_ { + self.assigned.iter().filter_map(|(&channel_id, b)| match b { + true => Some(channel_id), + false => None, + }) + } + pub fn batch_assign_nones( &mut self, channel_ids: impl Iterator,