Changeset - 7f75e1f23836
[Not reviewed]
0 5 2
Christopher Esterhuyse - 5 years ago 2020-07-14 13:28:54
christopher.esterhuyse@gmail.com
bugfix: regression from previous vers: proto components correctly enforce that each of their ports' firing variables are assigned FIRING iff that port put or get during the round
7 files changed with 220 insertions and 27 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -37,7 +37,7 @@ lazy_static = "1.4.0"
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_socket_api"]
 
default = ["ffi", "ffi_socket_api"] # // "session_optimization", 
 
ffi = [] # see src/ffi.rs
 
ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"]
 
endpoint_logging = [] # see src/macros.rs
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
    //put(o, get(i));
 
    put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
@@ -11,3 +11,10 @@ primitive together(in ia, in ib, out oa, out ob){
 
    }
 
  }	
 
}
 

	
 
primitive alt_round_merger(in a, in b, out c){
 
  while(true) {
 
    synchronous{ put(c, get(a)); }
 
    synchronous{ put(c, get(b)); }
 
  }	
 
}
examples/pres_5/amy.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_3_amy.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	// ... with 2 outgoing network connections
 
	PortId ports[2];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 0. Putting {ports[0]=\"r0p0\", ports[1]=\"r0p1\"}\n");
 
	connector_put_bytes(c, ports[0], "r0p0", 4);
 
	connector_put_bytes(c, ports[1], "r0p1", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 1. Putting {ports[1]=\"r1p1\"}\n");
 
	connector_put_bytes(c, ports[1], "r1p1", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 2. Putting {ports[0]=\"r2p0\"}\n");
 
	connector_put_bytes(c, ports[0], "r2p0", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("\nExiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_5/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with a protocol defined in a file
 
	char * pdl = buffer_pdl("./eg_protocols.pdl");
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl));
 
	char logpath[] = "./pres_3_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 

	
 
	// ... with 2 outgoing network connections
 
	PortId ports[4];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	connector_add_port_pair(c, &ports[2], &ports[3]);
 
	connector_add_component(c, "alt_round_merger", 16, ports, 3);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	for(int round=0; round<3; round++) {
 
		printf("\nRound %d\n", round);
 
		connector_get(c, ports[3]);
 
		rw_err_peek(c);
 
		connector_sync(c, 1000);
 
		rw_err_peek(c);
 

	
 
		size_t msg_len = 0;
 
		const char * msg_ptr = connector_gotten_bytes(c, ports[3], &msg_len);
 
		printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
src/protocol/inputsource.rs
Show inline comments
 
@@ -23,8 +23,8 @@ primitive sync(in i, out o) {
 
}
 
primitive alternator_2(in i, out l, out r) {
 
    while(true) {
 
        synchronous() put(l, get(i));
 
        synchronous() put(r, get(i));
 
        synchronous() if(fires(i)) put(l, get(i));
 
        synchronous() if(fires(i)) put(r, get(i));
 
    }
 
}
 
primitive replicator_2(in i, out l, out r) {
 
@@ -36,8 +36,8 @@ primitive replicator_2(in i, out l, out r) {
 
}
 
primitive merger_2(in l, in r, out o) {
 
    while(true) synchronous {
 
        if(fires(l)) put(o, get(l));
 
        else         put(o, get(r));
 
        if(fires(l))      put(o, get(l));
 
        else if(fires(r)) put(o, get(r));
 
    }
 
}";
 

	
src/runtime/communication.rs
Show inline comments
 
@@ -36,6 +36,7 @@ struct BranchingProtoComponent {
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    did_put: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
@@ -185,6 +186,7 @@ impl Connector {
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
@@ -708,7 +710,7 @@ impl BranchingNative {
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::fold_into(finished, predicate, branch);
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
@@ -720,13 +722,13 @@ impl BranchingNative {
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
@@ -739,8 +741,8 @@ impl BranchingNative {
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
@@ -751,13 +753,13 @@ impl BranchingNative {
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn fold_into(
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
        mut branch: NativeBranch,
 
@@ -766,18 +768,22 @@ impl BranchingNative {
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                let b = eo.get_mut();
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get
 
                let old = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if b.gotten.insert(k, v).is_none() {
 
                        b.to_get.remove(&k);
 
                    if old.gotten.insert(k, v).is_none() {
 
                        // added a gotten element in `branch` not already in `old`
 
                        old.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
        // if let Some(prev) = branches.insert(predicate, branch)
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
@@ -811,7 +817,7 @@ impl BranchingProtoComponent {
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
                logger: &mut *cu.logger,
 
@@ -839,14 +845,25 @@ impl BranchingProtoComponent {
 
                    }
 
                }
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    // EXPLICIT inconsistency
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    for port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(*port);
 
                        let should_have_fired = match cu.port_info.polarities.get(port).unwrap() {
 
                            Polarity::Getter => branch.inbox.contains_key(port),
 
                            Polarity::Putter => branch.did_put.contains(port),
 
                        };
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
@@ -884,6 +901,7 @@ impl BranchingProtoComponent {
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.did_put.insert(putter);
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
@@ -894,6 +912,16 @@ impl BranchingProtoComponent {
 
            Ok(())
 
        })
 
    }
 
    fn branch_merge_func(
 
        mut a: ProtoComponentBranch,
 
        b: &mut ProtoComponentBranch,
 
    ) -> ProtoComponentBranch {
 
        if b.ended && !a.ended {
 
            a.ended = true;
 
            std::mem::swap(&mut a, b);
 
        }
 
        a
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
@@ -983,6 +1011,7 @@ impl BranchingProtoComponent {
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            did_put: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
@@ -1123,6 +1152,19 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn merge_input_with<F: FnMut(V, &mut V) -> V>(&mut self, k: K, v: V, mut func: F) {
 
        use std::collections::hash_map::Entry;
 
        let e = self.swap.entry(k);
 
        match e {
 
            Entry::Vacant(ev) => {
 
                ev.insert(v);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                let old = eo.get_mut();
 
                *old = func(v, old);
 
            }
 
        }
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
@@ -1185,7 +1227,7 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainInner { swap, output } }
 
    }
 
    fn cylic_drain<E>(
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
src/runtime/tests.rs
Show inline comments
 
@@ -23,11 +23,18 @@ fn next_test_addr() -> SocketAddr {
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
    Connector::new(file_logger, pd, connector_id, 8)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
@@ -793,8 +800,8 @@ fn udp_reowolf_swap() {
 
}
 

	
 
#[test]
 
fn pres_3() {
 
    let test_log_path = Path::new("./logs/pres_3");
 
fn example_pres_3() {
 
    let test_log_path = Path::new("./logs/example_pres_3");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
@@ -834,3 +841,42 @@ fn pres_3() {
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn ac_not_b() {
 
    let test_log_path = Path::new("./logs/ac_not_b");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 

	
 
            // put both A and B
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in a, in b, out c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let [a, b] = c.new_port_pair();
 
            c.add_component(b"ac_not_b", &[p0, p1, a]).unwrap();
 
            c.connect(SEC1).unwrap();
 

	
 
            c.get(b).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)