Changeset - 43f31542f6db
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-02-04 16:08:29
christopheresterhuyse@gmail.com
more debug prints on branch predicates
3 files changed with 44 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -10,48 +10,49 @@ pub(crate) struct MonoN {
 
pub(crate) struct PolyN {
 
    pub ekeys: HashSet<Key>,
 
    pub branches: HashMap<Predicate, BranchN>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchN {
 
    pub to_get: HashSet<Key>,
 
    pub gotten: HashMap<Key, Payload>,
 
    pub sync_batch_index: usize,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
//////////////////////////////////////////////////////////////////
 

	
 
impl PolyP {
 
    pub(crate) fn poly_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        let to_run: Vec<_> = self.incomplete.drain().collect();
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn poly_run_these_branches(
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 
@@ -93,75 +94,81 @@ impl PolyP {
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    // come up with the predicate for this local solution
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    lockprintln!(
 
                        "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                        m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                            m_ctx.my_subtree_id,
 
                            predicate.clone(),
 
                        );
 
                        // store the solution for recovering later
 
                        self.complete.insert(predicate, branch);
 
                    } else {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                        ),
 
                        Ordering::Equal => {
 
                            lockprintln!(
 
                                "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                                cid,
 
                                m_ctx.my_subtree_id,
 
                                &predicate,
 
                            );
 
                            m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                                m_ctx.my_subtree_id,
 
                                predicate.clone(),
 
                            );
 
                            // store the solution for recovering later
 
                            self.complete.insert(predicate, branch);
 
                        }
 
                    }
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        branch.outbox.insert(ekey, payload.clone());
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
                        }
 
                        .into_msg(m_ctx.inner.round_index);
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
 
        Ok(if self.incomplete.is_empty() {
 
            if self.complete.is_empty() {
 
                Srr::NoBranches
 
            } else {
 
                Srr::AllBranchesComplete
 
            }
 
        } else {
 
            Srr::BlockingForRecv
 
        })
 
    }
 

	
 
@@ -255,73 +262,73 @@ impl PolyP {
 
                        Csr::Nonexistant => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                                cid,
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
 
                        }
 
                    }
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
 
        };
 
        lockprintln!("{:?}: ... DONE FEEDING BRANCHES. {} branches to run!", cid, to_run.len(),);
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        all_inboxes: &mut HashMap<Key, Payload>,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoP {
 
        if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchP { inbox, state } = branch;
 
            for (key, payload) in inbox {
 
                assert!(all_inboxes.insert(key, payload).is_none());
 
            let BranchP { inbox, state, outbox } = branch;
 
            for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) {
 
                table_row.insert(key, payload);
 
            }
 
            self.incomplete.clear();
 
            MonoP { state, ekeys: self.ekeys }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        payload: Payload,
 
        solution_storage: &mut SolutionStorage,
 
    ) {
 
        for (predicate, branch) in self.branches.iter_mut() {
 
            if branch.to_get.remove(&ekey) {
 
                branch.gotten.insert(ekey, payload.clone());
 
                if branch.to_get.is_empty() {
 
                    solution_storage
 
                        .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        all_inboxes: &mut HashMap<Key, Payload>,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(all_inboxes.insert(key, payload.clone()).is_none());
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision);
 

	
 
        let mut all_inboxes = HashMap::default();
 
        let mut table_row = HashMap::default();
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut all_inboxes));
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut all_inboxes)),
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 
        let valuations: HashMap<_, _> = all_inboxes
 
            .drain()
 
            .map(|(ekey, payload)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, Some(payload))
 
            })
 
            .collect();
 
        for (channel_id, value) in decision.assigned.iter() {
 
            if !value {
 
                lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
            } else if let Some(payload) = valuations.get(channel_id) {
 
                lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
            } else {
 
                lockprintln!("{:?}: VALUE {:?} => Message(?)", cid, channel_id);
 
            }
 
        for (ekey, payload) in table_row {
 
            let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
        }
 
        for channel_id in decision.iter_matching(false) {
 
            lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            lockprintln!(
 
                "{:?}: Forwarding {:?} to child with ekey {:?}",
 
                cid,
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
@@ -425,48 +416,49 @@ impl Controller {
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                }
 
            },
 
            ekeys: self.ekeys,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        lockprintln!(
 
            "{:?}: !! MonoContext callback to new_component with ekeys {:?}!",
 
            self.inner.channel_id_stream.controller_id,
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
src/runtime/mod.rs
Show inline comments
 
@@ -385,53 +385,52 @@ impl Predicate {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_true(&self) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned.iter().filter_map(|(&channel_id, b)| match b {
 
            true => Some(channel_id),
 
            false => None,
 
        })
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
0 comments (0 inline, 0 general)