Changeset - ef64fd0cdb07
[Not reviewed]
0 4 1
Christopher Esterhuyse - 5 years ago 2020-08-28 16:38:01
christopher.esterhuyse@gmail.com
correctly merging protocol component branches: forks that create existing predicates MERGE the results, combining their inboxes s.t. there is no race condition on the order of received messages
5 files changed with 117 insertions and 11 deletions:
0 comments (0 inline, 0 general)
examples/README.md
Show inline comments
 
@@ -25,7 +25,9 @@ Examples include interactions whose distributed sessions span multiple source fi
 
2. {pres_3/amy, pres_4/bob}: *Nondeterminism*. Programs/components can express flexibility by providing mutually-exclusive firing patterns on their ports, as a nondeterministic choice. Which (if any) choice occurs can be determined after synchronization by inspecting the return value of `connector_sync`. Atomicity + Nondeterminism = Specialization of behavior.
 
2. {pres_5/amy, pres_5/bob}: When no synchronous interaction is found before some consituent program times out, the system RECOVERS to the synchronous state at the start of the round, allowing components to try again.
 

	
 
### Interoperability examples
 
The examples contained within directories with names matching `interop_` demonstrate the use of different APIs for communication over UDP channels. The three given programs are intended to be run together, each as its own process.
 

	
 
Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example.
 
\ No newline at end of file
 
Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example.
 

	
 
NOTE: These examples are designed to compile on Linux!
 
\ No newline at end of file
examples/bench_1/main.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./bench_1.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000};
 
	FfiSocketAddr peer_addr =  {{8, 8, 8, 1}, 8001};
 
	rw_err_peek(c);
 
	connector_add_udp_mediator_component(c, &putter, &getter, local_addr, peer_addr);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		
 
		// ... reach new consistent state within 1000ms deadline.
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
src/common.rs
Show inline comments
 
@@ -50,13 +50,13 @@ pub struct U32Stream {
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
#[derive(Default, Clone, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
@@ -86,12 +86,21 @@ pub(crate) enum SyncBlocker {
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl Eq for Payload {}
 
impl PartialEq for Payload {
 
    fn eq(&self, other: &Self) -> bool {
 
        // self.as_slice() == other.as_slice()
 
        let res = self.as_slice() == other.as_slice();
 
        println!("CMP RESULT IS.... {}", res);
 
        res
 
    }
 
}
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
src/runtime/communication.rs
Show inline comments
 
@@ -327,13 +327,19 @@ impl Connector {
 
                index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                log!(
 
                    cu.inner.logger,
 
                    "Native branch {} sending msg {:?} with putter {:?}",
 
                    index,
 
                    &msg,
 
                    putter
 
                );
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
@@ -981,45 +987,45 @@ impl BranchingProtoComponent {
 
        let (mut blocked, pcb_temps) = pcb_temps.split_first_mut();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut();
 
@@ -1033,12 +1039,35 @@ impl BranchingProtoComponent {
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, branches);
 
        log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
src/runtime/tests.rs
Show inline comments
 
@@ -960,12 +960,13 @@ fn pdl_reo_fifo1() {
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1full() {
 
    let test_log_path = Path::new("./logs/pdl_reo_fifo1full");
 
    let pdl = b"
 
    primitive fifo1full(in a, out b) {
 
        msg m = create(0);
 
        while(true) synchronous {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
@@ -974,15 +975,42 @@ fn pdl_reo_fifo1full() {
 
                m = null;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = Connector::new(Box::new(DummyLogger), Arc::new(pd), 0);
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [_p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"fifo1full", &[g0, p1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(0, c.gotten(g1).unwrap().len());
 
}
 

	
 
#[test]
 
fn pdl_msg_consensus() {
 
    let test_log_path = Path::new("./logs/pdl_msg_consensus");
 
    let pdl = b"
 
    primitive msgconsensus(in a, in b) {
 
        while(true) synchronous {
 
            msg x = get(a);
 
            msg y = get(b);
 
            assert(x == y);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"msgconsensus", &[g0, g1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap();
 

	
 
    c.put(p0, Payload::from(b"HEY" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap_err();
 
}
0 comments (0 inline, 0 general)