Changeset - aa88e65c3d92
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-02-10 15:15:28
christopheresterhuyse@gmail.com
exchange
4 files changed with 44 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -378,24 +378,33 @@ impl PolyN {
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        logger: &mut String,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        if let Some((_, branch)) = self
 
        log!(
 
            logger,
 
            "decision {:?} with branch preds {:?}",
 
            decision,
 
            self.branches.iter().collect::<Vec<_>>()
 
        );
 
        if let Some((branch_pred, branch)) = self
 
            .branches
 
            .drain()
 
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
 
        {
 
            log!(logger, "decision {:?} mapped to branch {:?}", decision, branch_pred);
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            log!(logger, "decision {:?} HAD NO SOLUTION!!?", decision);
 
            panic!("No such solution!")
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -3,17 +3,16 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let mut table_row = HashMap::<Key, _>::default();
 
        // 1. become_mono for Poly actors
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_n =
 
            self.ephemeral.poly_n.take().map(|poly_n| {
 
                poly_n.become_mono(&mut self.inner.logger, &decision, &mut table_row)
 
            });
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 

	
 
        // convert (Key=>Payload) map to (ChannelId=>Payload) map.
 
        let table_row: HashMap<_, _> = table_row
 
@@ -121,13 +120,13 @@ impl Controller {
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
src/runtime/mod.rs
Show inline comments
 
@@ -383,13 +383,13 @@ impl Predicate {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((sid, sb));
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
@@ -405,17 +405,17 @@ impl Predicate {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                    new.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
                New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
src/test/connector.rs
Show inline comments
 
@@ -38,15 +38,20 @@ primitive alternator_2(in i, out a, out b) {
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}
 
composite forward_pair(in ia, out oa, in ib, out ob) {
 
    new forward(ia, oa);
 
    new forward(ib, ob);
 
primitive exchange(in ai, out ao, in bi, out bo) {
 
    // Note the implicit causal relationship
 
    while(true) synchronous {
 
        if(fires(ai)) {
 
            put(bo, get(ai));
 
            put(ao, get(bi));
 
        }
 
    }
 
}
 
primitive forward_nonzero(in i, out o) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        assert(m[0] != 0);
 
        put(o, m);
 
@@ -515,13 +520,12 @@ fn alternator_2() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain_a() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -555,13 +559,12 @@ fn composite_chain_a() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain_b() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
@@ -595,59 +598,57 @@ fn composite_chain_b() {
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1605
 
fn exchange() {
 
    /*
 
        /-->forward-->P|A-->forward-->\
 
    Alice                             Bob
 
        \<--forward<--P|A<--forward<--/
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(2, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"A->B".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(2, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"B->A".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn filter_messages() {
 
    // Make a protocol whose behavior depends on the contents of messages
 
    // Getter prohibits the receipt of messages of the form [0, ...].
 
    // those messages are silent
 
    /*
 
    Sender -->forward-->P|A-->forward_nonzero--> Receiver
0 comments (0 inline, 0 general)