From ef64fd0cdb07c254428f8bd4ee9acf7a5858efe5 2020-08-28 16:38:01 From: Christopher Esterhuyse Date: 2020-08-28 16:38:01 Subject: [PATCH] correctly merging protocol component branches: forks that create existing predicates MERGE the results, combining their inboxes s.t. there is no race condition on the order of received messages --- diff --git a/examples/README.md b/examples/README.md index e5566219bbd73b2721400e85bf191f919722ba3d..cfb55b833d773498efb7e4a2d802feaea7eba2e0 100644 --- a/examples/README.md +++ b/examples/README.md @@ -28,4 +28,6 @@ Examples include interactions whose distributed sessions span multiple source fi ### Interoperability examples The examples contained within directories with names matching `interop_` demonstrate the use of different APIs for communication over UDP channels. The three given programs are intended to be run together, each as its own process. -Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example. \ No newline at end of file +Each example source file is prefixed by a multi-line comment, explaining what a reader is intended to take away from the example. + +NOTE: These examples are designed to compile on Linux! \ No newline at end of file diff --git a/examples/bench_1/main.c b/examples/bench_1/main.c new file mode 100644 index 0000000000000000000000000000000000000000..7d1a5d43f11425e47879ac6ae716e5449ad4f875 --- /dev/null +++ b/examples/bench_1/main.c @@ -0,0 +1,38 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./bench_1.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + PortId putter, getter; + FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000}; + FfiSocketAddr peer_addr = {{8, 8, 8, 1}, 8001}; + rw_err_peek(c); + connector_add_udp_mediator_component(c, &putter, &getter, local_addr, peer_addr); + connector_connect(c, -1); + rw_err_peek(c); + + // Prepare a message to send + size_t msg_len = 16; + char * msg_ptr = malloc(msg_len); + memset(msg_ptr, 42, msg_len); + + int i; + for(i=0; i<10; i++) { + connector_put_bytes(c, putter, msg_ptr, msg_len); + rw_err_peek(c); + + // ... reach new consistent state within 1000ms deadline. + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(msg_ptr); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index 0acde4a19962847266480d54c46e38731c042a40..be3216cc2148d4453dd477faf1831ff740689ae5 100644 --- a/src/common.rs +++ b/src/common.rs @@ -53,7 +53,7 @@ pub struct U32Stream { )] #[repr(transparent)] pub struct PortId(Id); -#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Default, Clone, Ord, PartialOrd)] pub struct Payload(Arc>); #[derive( Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, @@ -89,6 +89,15 @@ pub(crate) enum SyncBlocker { pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); ///////////////////// IMPL ///////////////////// +impl Eq for Payload {} +impl PartialEq for Payload { + fn eq(&self, other: &Self) -> bool { + // self.as_slice() == other.as_slice() + let res = self.as_slice() == other.as_slice(); + println!("CMP RESULT IS.... {}", res); + res + } +} impl IdParts for Id { fn id_parts(self) -> (ConnectorId, U32Suffix) { (self.connector_id, self.u32_suffix) diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b7a263cde8f75c4dc4d45b8bd0d958de39988d13..8f334e9e53205752c55b7b5b795ae58f5343db4c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -330,7 +330,13 @@ impl Connector { // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); + log!( + cu.inner.logger, + "Native branch {} sending msg {:?} with putter {:?}", + index, + &msg, + putter + ); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -984,7 +990,7 @@ impl BranchingProtoComponent { for (predicate, mut branch) in branches.drain() { if branch.ended { log!(logger, "Skipping ended branch with {:?}", &predicate); - blocked.insert(predicate, branch); + Self::insert_branch_merging(&mut blocked, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -993,13 +999,13 @@ impl BranchingProtoComponent { Aur::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); - blocked.insert(predicate, branch); + Self::insert_branch_merging(&mut blocked, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); - unblocked.insert(predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -1007,16 +1013,16 @@ impl BranchingProtoComponent { let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); - blocked.insert(predicate, branch); - unblocked.insert(predicate2, branch2); + Self::insert_branch_merging(&mut blocked, predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); - blocked.insert(predicate, branch); - unblocked.insert(predicate2, branch2); + Self::insert_branch_merging(&mut blocked, predicate, branch); + Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } } } @@ -1036,6 +1042,29 @@ impl BranchingProtoComponent { log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); Ok(()) } + fn insert_branch_merging( + branches: &mut HashMap, + predicate: Predicate, + mut branch: ProtoComponentBranch, + ) { + let e = branches.entry(predicate); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + // no existing branch present. We insert it no problem. (The most common case) + ev.insert(branch); + } + Entry::Occupied(mut eo) => { + // Oh dear, there is already a branch with this predicate. + // Rather than choosing either branch, we MERGE them. + // This means keeping the existing one in-place, and giving it the UNION of the inboxes + let old = eo.get_mut(); + for (k, v) in branch.inner.inbox.drain() { + old.inner.inbox.insert(k, v); + } + } + } + } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 6bd4f6d0f8996a6434db3dac426e7b577885a762..66e56fc5deaf489582c0f3a77b2b0bb8400a45d2 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -963,6 +963,7 @@ fn pdl_reo_fifo1() { #[test] fn pdl_reo_fifo1full() { + let test_log_path = Path::new("./logs/pdl_reo_fifo1full"); let pdl = b" primitive fifo1full(in a, out b) { msg m = create(0); @@ -977,7 +978,7 @@ fn pdl_reo_fifo1full() { } "; let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = Connector::new(Box::new(DummyLogger), Arc::new(pd), 0); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); let [_p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); c.add_component(b"fifo1full", &[g0, p1]).unwrap(); @@ -986,3 +987,30 @@ fn pdl_reo_fifo1full() { c.sync(None).unwrap(); assert_eq!(0, c.gotten(g1).unwrap().len()); } + +#[test] +fn pdl_msg_consensus() { + let test_log_path = Path::new("./logs/pdl_msg_consensus"); + let pdl = b" + primitive msgconsensus(in a, in b) { + while(true) synchronous { + msg x = get(a); + msg y = get(b); + assert(x == y); + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"msgconsensus", &[g0, g1]).unwrap(); + c.connect(None).unwrap(); + c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap(); + c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); + c.sync(SEC1).unwrap(); + + c.put(p0, Payload::from(b"HEY" as &[_])).unwrap(); + c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); + c.sync(SEC1).unwrap_err(); +}