Changeset - b788ae5c2251
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-02-04 16:35:45
christopheresterhuyse@gmail.com
smaller testg
2 files changed with 50 insertions and 0 deletions:
0 comments (0 inline, 0 general)
src/runtime/actors.rs
Show inline comments
 
@@ -34,99 +34,110 @@ pub(crate) struct BranchP {
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
//////////////////////////////////////////////////////////////////
 

	
 
impl PolyP {
 
    pub(crate) fn poly_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        let to_run: Vec<_> = self.incomplete.drain().collect();
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn poly_run_these_branches(
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ~ Running branches for PolyP {:?}!", cid, m_ctx.my_subtree_id,);
 
        while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
            use PolyBlocker as Sb;
 
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
 
            lockprintln!(
 
                "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                cid,
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    lockprintln!(
 
                        "{:?}: ~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        lockprintln!("{:?}: ~ ... Delay {:?}", cid, m_ctx.my_subtree_id,);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        lockprintln!("{:?}: ~ ... Drop {:?}", cid, m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    // come up with the predicate for this local solution
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    lockprintln!(
 
                        "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            cid,
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        ),
 
                        Ordering::Equal => {
 
                            lockprintln!(
 
                                "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                                cid,
src/test/connector.rs
Show inline comments
 
@@ -8,48 +8,87 @@ use std::thread;
 
use test_generator::test_resources;
 

	
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
#[test_resources("testdata/connector/duo/*.apdl")]
 
fn batch1(resource: &str) {
 
    let a = Path::new(resource);
 
    let b = a.with_extension("bpdl");
 
    let a = fs::read_to_string(a).unwrap();
 
    let b = fs::read_to_string(b).unwrap();
 
    duo(a, b);
 
}
 

	
 
fn duo(one: String, two: String) {
 
    let a = thread::spawn(move || {
 
        let timeout = Duration::from_millis(1_500);
 
        let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
        x.configure(one.as_bytes()).unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let timeout = Duration::from_millis(1_500);
 
        let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
        x.configure(two.as_bytes()).unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[1])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[0])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
    let a = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m);
 
            }
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
            synchronous {
 
                get(a);
 
            }
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
0 comments (0 inline, 0 general)