Changeset - aa88e65c3d92
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-02-10 15:15:28
christopheresterhuyse@gmail.com
exchange
4 files changed with 44 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -360,42 +360,51 @@ impl PolyN {
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
                }
 
            }
 
            // unlike PolyP machines, Native branches do not become inconsistent
 
            branches2.insert(old_predicate, branch);
 
        }
 
        log!(
 
            logger,
 
            "Native now has {} branches with predicates: {:?}",
 
            branches2.len(),
 
            branches2.keys().collect::<Vec<_>>()
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        logger: &mut String,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        if let Some((_, branch)) = self
 
        log!(
 
            logger,
 
            "decision {:?} with branch preds {:?}",
 
            decision,
 
            self.branches.iter().collect::<Vec<_>>()
 
        );
 
        if let Some((branch_pred, branch)) = self
 
            .branches
 
            .drain()
 
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
 
        {
 
            log!(logger, "decision {:?} mapped to branch {:?}", decision, branch_pred);
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            log!(logger, "decision {:?} HAD NO SOLUTION!!?", decision);
 
            panic!("No such solution!")
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let mut table_row = HashMap::<Key, _>::default();
 
        // 1. become_mono for Poly actors
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_n =
 
            self.ephemeral.poly_n.take().map(|poly_n| {
 
                poly_n.become_mono(&mut self.inner.logger, &decision, &mut table_row)
 
            });
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 

	
 
        // convert (Key=>Payload) map to (ChannelId=>Payload) map.
 
        let table_row: HashMap<_, _> = table_row
 
            .into_iter()
 
            .map(|(ekey, msg)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, msg)
 
            })
 
            .collect();
 
        // log all firing ports
 
        for (channel_id, payload) in table_row {
 
            log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload);
 
        }
 
        // log all silent ports
 
        for channel_id in decision.iter_matching(false) {
 
            log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id);
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
@@ -103,49 +102,49 @@ impl Controller {
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        // TODO! fuse handle_locals_return_decision and end_round_return_decision
 

	
src/runtime/mod.rs
Show inline comments
 
@@ -365,75 +365,75 @@ impl Predicate {
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((sid, sb));
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                    new.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
                New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
src/test/connector.rs
Show inline comments
 
@@ -20,51 +20,56 @@ primitive sync(in i, out o) {
 
    }
 
}
 
primitive fifo_1(in i, out o) {
 
    msg holding = null;
 
    while(true) synchronous {
 
        if (holding == null && fires(i)) {
 
            holding = get(i);
 
        } else if (holding != null && fires(o)) {
 
            put(o, holding);
 
            holding = null;
 
        }
 
    }
 
}
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}
 
composite sync_2(in i, out o) {
 
    channel x -> y;
 
    new sync(i, x);
 
    new sync(y, o);
 
}
 
composite forward_pair(in ia, out oa, in ib, out ob) {
 
    new forward(ia, oa);
 
    new forward(ib, ob);
 
primitive exchange(in ai, out ao, in bi, out bo) {
 
    // Note the implicit causal relationship
 
    while(true) synchronous {
 
        if(fires(ai)) {
 
            put(bo, get(ai));
 
            put(ao, get(bi));
 
        }
 
    }
 
}
 
primitive forward_nonzero(in i, out o) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        assert(m[0] != 0);
 
        put(o, m);
 
    }
 
}
 
primitive token_spout(out o) {
 
    while(true) synchronous {
 
        put(o, create(0));
 
    }
 
}
 
primitive wait_n(int to_wait, out o) {
 
    while(to_wait > 0) synchronous() to_wait -= 1;
 
    synchronous { put(o, create(0)); }
 
}
 
composite wait_10(out o) {
 
    new wait_n(10, o);
 
}
 
";
 

	
 
#[test]
 
fn connects_ok() {
 
@@ -497,175 +502,171 @@ fn alternator_2() {
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(PDL, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain_a() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"SSS";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1536
 
fn composite_chain_b() {
 
    // Check if composition works. Forward messages through long chains
 
    /*
 
    Alice -->sync-->sync-->A|P-->sync-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    static MSG: &[u8] = b"SSS";
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, MSG.to_vec()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"sync_2").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(MSG), x.read_gotten(0));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// PANIC TODO: eval::1605
 
fn exchange() {
 
    /*
 
        /-->forward-->P|A-->forward-->\
 
    Alice                             Bob
 
        \<--forward<--P|A<--forward<--/
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(2, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(3, Passive(addrs[1])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"A->B".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"forward_pair").unwrap();
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Native).unwrap(); // native in
 
            x.bind_port(1, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(2, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.bind_port(1, Native).unwrap(); // native out
 
            x.bind_port(2, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(3, Active(addrs[0])).unwrap(); // peer in
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                x.put(0, b"B->A".to_vec()).unwrap();
 
                x.get(1).unwrap();
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(0));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn filter_messages() {
 
    // Make a protocol whose behavior depends on the contents of messages
 
    // Getter prohibits the receipt of messages of the form [0, ...].
 
    // those messages are silent
 
    /*
 
    Sender -->forward-->P|A-->forward_nonzero--> Receiver
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PDL, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for i in (0..3).cycle().take(N) {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], [0], [1] ...
 
                                         // batches: [{0=>*}, {0=>?}]
 
                x.next_batch().unwrap();
 
                x.put(0, msg.clone()).unwrap();
0 comments (0 inline, 0 general)