Changeset - 43f31542f6db
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-02-04 16:08:29
christopheresterhuyse@gmail.com
more debug prints on branch predicates
3 files changed with 44 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -22,24 +22,25 @@ pub(crate) struct BranchN {
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
//////////////////////////////////////////////////////////////////
 

	
 
impl PolyP {
 
    pub(crate) fn poly_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
@@ -105,51 +106,57 @@ impl PolyP {
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    lockprintln!(
 
                        "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                        m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                            m_ctx.my_subtree_id,
 
                            predicate.clone(),
 
                        );
 
                        // store the solution for recovering later
 
                        self.complete.insert(predicate, branch);
 
                    } else {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                        ),
 
                        Ordering::Equal => {
 
                            lockprintln!(
 
                                "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                                cid,
 
                                m_ctx.my_subtree_id,
 
                                &predicate,
 
                            );
 
                            m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                                m_ctx.my_subtree_id,
 
                                predicate.clone(),
 
                            );
 
                            // store the solution for recovering later
 
                            self.complete.insert(predicate, branch);
 
                        }
 
                    }
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        branch.outbox.insert(ekey, payload.clone());
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
                        }
 
                        .into_msg(m_ctx.inner.round_index);
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
@@ -267,30 +274,30 @@ impl PolyP {
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
 
        };
 
        lockprintln!("{:?}: ... DONE FEEDING BRANCHES. {} branches to run!", cid, to_run.len(),);
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        all_inboxes: &mut HashMap<Key, Payload>,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoP {
 
        if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchP { inbox, state } = branch;
 
            for (key, payload) in inbox {
 
                assert!(all_inboxes.insert(key, payload).is_none());
 
            let BranchP { inbox, state, outbox } = branch;
 
            for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) {
 
                table_row.insert(key, payload);
 
            }
 
            self.incomplete.clear();
 
            MonoP { state, ekeys: self.ekeys }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
@@ -303,25 +310,25 @@ impl PolyN {
 
                branch.gotten.insert(ekey, payload.clone());
 
                if branch.to_get.is_empty() {
 
                    solution_storage
 
                        .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        all_inboxes: &mut HashMap<Key, Payload>,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(all_inboxes.insert(key, payload.clone()).is_none());
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision);
 

	
 
        let mut all_inboxes = HashMap::default();
 
        let mut table_row = HashMap::default();
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut all_inboxes));
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut all_inboxes)),
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 
        let valuations: HashMap<_, _> = all_inboxes
 
            .drain()
 
            .map(|(ekey, payload)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, Some(payload))
 
            })
 
            .collect();
 
        for (channel_id, value) in decision.assigned.iter() {
 
            if !value {
 
                lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
            } else if let Some(payload) = valuations.get(channel_id) {
 
                lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
            } else {
 
                lockprintln!("{:?}: VALUE {:?} => Message(?)", cid, channel_id);
 
            }
 
        for (ekey, payload) in table_row {
 
            let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
        }
 
        for channel_id in decision.iter_matching(false) {
 
            lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            lockprintln!(
 
                "{:?}: Forwarding {:?} to child with ekey {:?}",
 
                cid,
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
@@ -437,24 +428,25 @@ impl ControllerEphemeral {
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                }
 
            },
 
            ekeys: self.ekeys,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -397,29 +397,28 @@ impl Predicate {
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_true(&self) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned.iter().filter_map(|(&channel_id, b)| match b {
 
            true => Some(channel_id),
 
            false => None,
 
        })
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
0 comments (0 inline, 0 general)