Changeset - 73604b87289d
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-02-04 15:47:08
christopheresterhuyse@gmail.com
more debug prints on branch predicates
3 files changed with 57 insertions and 14 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -79,59 +79,83 @@ impl PolyP {
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 
                        self.incomplete.insert(predicate, branch);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    // come up with the predicate for this local solution
 
                    let ekeys_channel_id_iter = self
 
                        .ekeys
 
                        .iter()
 
                        .map(|&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id);
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 
                    // report the local solution
 
                    m_ctx
 
                        .solution_storage
 
                        .submit_and_digest_subtree_solution(m_ctx.my_subtree_id, predicate.clone());
 
                    // store the solution for recovering later
 
                    self.complete.insert(predicate, branch);
 

	
 
                    lockprintln!(
 
                        "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    if predicate.iter_true().count() == branch.inbox.keys().map(lookup).count() {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                        m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                            m_ctx.my_subtree_id,
 
                            predicate.clone(),
 
                        );
 
                        // store the solution for recovering later
 
                        self.complete.insert(predicate, branch);
 
                    } else {
 
                        lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but was missing a GET. Pruning.",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        );
 
                    }
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
                        }
 
                        .into_msg(m_ctx.inner.round_index);
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
 
        Ok(if self.incomplete.is_empty() {
 
            if self.complete.is_empty() {
 
                Srr::NoBranches
 
            } else {
 
                Srr::AllBranchesComplete
 
@@ -177,50 +201,50 @@ impl PolyP {
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                                cid,
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
                            let was = branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                                cid,
 
                                &old_predicate,
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 

	
 
                            lockprintln!(
 
                                "{:?}: ... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                                cid,
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision);
 

	
 
        let mut all_inboxes = HashMap::default();
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut all_inboxes));
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut all_inboxes)),
 
        );
 
        let valuations: HashMap<_, _> = all_inboxes
 
            .drain()
 
            .map(|(ekey, payload)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, Some(payload))
 
            })
 
            .collect();
 
        for (channel_id, value) in decision.assigned.iter() {
 
            if !value {
 
                println!("VALUE {:?} => *", channel_id);
 
                lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
            } else if let Some(payload) = valuations.get(channel_id) {
 
                println!("VALUE {:?} => Message({:?})", channel_id, payload);
 
                lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
            } else {
 
                println!("VALUE {:?} => Message(?)", channel_id);
 
                lockprintln!("{:?}: VALUE {:?} => Message(?)", cid, channel_id);
 
            }
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            lockprintln!(
 
                "{:?}: Forwarding {:?} to child with ekey {:?}",
 
                cid,
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                lockprintln!("{:?}: Sending {:?} to parent {:?}", cid, &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(decision) = maybe_decision {
 
                lockprintln!("{:?}: DECIDE ON {:?} AS LEADER!", cid, &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ekeys, .. } = self.inner.mono_n.take().unwrap();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            let all_ekeys = ekeys.iter().copied();
 
            let all_channel_ids = all_ekeys.map(ekey_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
src/runtime/mod.rs
Show inline comments
 
@@ -385,48 +385,55 @@ impl Predicate {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_true(&self) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned.iter().filter_map(|(&channel_id, b)| match b {
 
            true => Some(channel_id),
 
            false => None,
 
        })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: ChannelId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
0 comments (0 inline, 0 general)