From 7f75e1f238365d4294213ecbf402766240dfe8d5 2020-07-14 13:28:54 From: Christopher Esterhuyse Date: 2020-07-14 13:28:54 Subject: [PATCH] bugfix: regression from previous vers: proto components correctly enforce that each of their ports' firing variables are assigned FIRING iff that port put or get during the round --- diff --git a/Cargo.toml b/Cargo.toml index 15f6693b5223cb5daec4c73ef7a4ac8c5ebeea8a..d79e155dc03beb1018665b10cb3c5c9de4b754f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi", "session_optimization", "ffi_socket_api"] +default = ["ffi", "ffi_socket_api"] # // "session_optimization", ffi = [] # see src/ffi.rs ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"] endpoint_logging = [] # see src/macros.rs diff --git a/examples/eg_protocols.pdl b/examples/eg_protocols.pdl index a545ea70afb988c8600f4cdf12f68d4e5f8c5749..4de6b59a35c1d45e788bee6a58ec0f1a5e9f8c8b 100644 --- a/examples/eg_protocols.pdl +++ b/examples/eg_protocols.pdl @@ -1,6 +1,6 @@ primitive pres_2(in i, out o) { synchronous { - //put(o, get(i)); + put(o, get(i)); } } primitive together(in ia, in ib, out oa, out ob){ @@ -11,3 +11,10 @@ primitive together(in ia, in ib, out oa, out ob){ } } } + +primitive alt_round_merger(in a, in b, out c){ + while(true) { + synchronous{ put(c, get(a)); } + synchronous{ put(c, get(b)); } + } +} diff --git a/examples/pres_5/amy.c b/examples/pres_5/amy.c new file mode 100644 index 0000000000000000000000000000000000000000..36ed98ce49f3dc39280f1450798bab22362f52fc --- /dev/null +++ b/examples/pres_5/amy.c @@ -0,0 +1,49 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with our (trivial) protocol. + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./pres_3_amy.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 2 outgoing network connections + PortId ports[2]; + char * addr = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr, strlen(addr), + Polarity_Putter, EndpointPolarity_Passive); + rw_err_peek(c); + addr = "127.0.0.1:8001"; + connector_add_net_port(c, &ports[1], addr, strlen(addr), + Polarity_Putter, EndpointPolarity_Passive); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + printf("Round 0. Putting {ports[0]=\"r0p0\", ports[1]=\"r0p1\"}\n"); + connector_put_bytes(c, ports[0], "r0p0", 4); + connector_put_bytes(c, ports[1], "r0p1", 4); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("Round 1. Putting {ports[1]=\"r1p1\"}\n"); + connector_put_bytes(c, ports[1], "r1p1", 4); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("Round 2. Putting {ports[0]=\"r2p0\"}\n"); + connector_put_bytes(c, ports[0], "r2p0", 4); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("\nExiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/pres_5/bob.c b/examples/pres_5/bob.c new file mode 100644 index 0000000000000000000000000000000000000000..26cc9b002756b81d9f7f8b6129ed4f81641fc964 --- /dev/null +++ b/examples/pres_5/bob.c @@ -0,0 +1,49 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with a protocol defined in a file + char * pdl = buffer_pdl("./eg_protocols.pdl"); + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl)); + char logpath[] = "./pres_3_bob.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 2 outgoing network connections + PortId ports[4]; + char * addr = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); + addr = "127.0.0.1:8001"; + connector_add_net_port(c, &ports[1], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + connector_add_port_pair(c, &ports[2], &ports[3]); + connector_add_component(c, "alt_round_merger", 16, ports, 3); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + for(int round=0; round<3; round++) { + printf("\nRound %d\n", round); + connector_get(c, ports[3]); + rw_err_peek(c); + connector_sync(c, 1000); + rw_err_peek(c); + + size_t msg_len = 0; + const char * msg_ptr = connector_gotten_bytes(c, ports[3], &msg_len); + printf("Got msg `%.*s`\n", msg_len, msg_ptr); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(pdl); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/src/protocol/inputsource.rs b/src/protocol/inputsource.rs index 362a7bb49a7ff89cfa1c6ab7783f5d6716da0f47..a02d58fc97492f36015260b8b8e5332ca3050b24 100644 --- a/src/protocol/inputsource.rs +++ b/src/protocol/inputsource.rs @@ -23,8 +23,8 @@ primitive sync(in i, out o) { } primitive alternator_2(in i, out l, out r) { while(true) { - synchronous() put(l, get(i)); - synchronous() put(r, get(i)); + synchronous() if(fires(i)) put(l, get(i)); + synchronous() if(fires(i)) put(r, get(i)); } } primitive replicator_2(in i, out l, out r) { @@ -36,8 +36,8 @@ primitive replicator_2(in i, out l, out r) { } primitive merger_2(in l, in r, out o) { while(true) synchronous { - if(fires(l)) put(o, get(l)); - else put(o, get(r)); + if(fires(l)) put(o, get(l)); + else if(fires(r)) put(o, get(r)); } }"; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 31586d5a65d54f78dc6017ee21bddfd53f0969d1..faa4436a0ab12df295bcdc9d94f7b99aaeba378e 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -36,6 +36,7 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { + did_put: HashSet, inbox: HashMap, state: ComponentState, untaken_choice: Option, @@ -185,6 +186,7 @@ impl Connector { ); // 1. run all proto components to Nonsync blockers + // NOTE: original components are immutable until Decision::Success let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = @@ -708,7 +710,7 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -720,13 +722,13 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); - Self::fold_into(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -739,8 +741,8 @@ impl BranchingNative { &predicate2, &predicate ); - Self::fold_into(finished, predicate, branch); - Self::fold_into(finished, predicate2, branch2); + Self::insert_branch_merging(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched @@ -751,13 +753,13 @@ impl BranchingNative { "new subsuming pred created {:?}. forking and feeding", &predicate2 ); - Self::fold_into(finished, predicate, branch); - Self::fold_into(finished, predicate2, branch2); + Self::insert_branch_merging(finished, predicate, branch); + Self::insert_branch_merging(finished, predicate2, branch2); } } } } - fn fold_into( + fn insert_branch_merging( branches: &mut HashMap, predicate: Predicate, mut branch: NativeBranch, @@ -766,18 +768,22 @@ impl BranchingNative { use std::collections::hash_map::Entry; match e { Entry::Vacant(ev) => { + // no existing branch present. We insert it no problem. (The most common case) ev.insert(branch); } Entry::Occupied(mut eo) => { - let b = eo.get_mut(); + // Oh dear, there is already a branch with this predicate. + // Rather than choosing either branch, we MERGE them. + // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get + let old = eo.get_mut(); for (k, v) in branch.gotten.drain() { - if b.gotten.insert(k, v).is_none() { - b.to_get.remove(&k); + if old.gotten.insert(k, v).is_none() { + // added a gotten element in `branch` not already in `old` + old.to_get.remove(&k); } } } } - // if let Some(prev) = branches.insert(predicate, branch) } fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { log!( @@ -811,7 +817,7 @@ impl BranchingProtoComponent { proto_component_id: ProtoComponentId, ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { - cd.cylic_drain(|mut predicate, mut branch, mut drainer| { + cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { untaken_choice: &mut branch.untaken_choice, logger: &mut *cu.logger, @@ -839,14 +845,25 @@ impl BranchingProtoComponent { } } B::Inconsistent => { - // branch is inconsistent. throw it away + // EXPLICIT inconsistency drop((predicate, branch)); } B::SyncBlockEnd => { // make concrete all variables - for &port in ports.iter() { - let var = cu.port_info.spec_var_for(port); - predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + for port in ports.iter() { + let var = cu.port_info.spec_var_for(*port); + let should_have_fired = match cu.port_info.polarities.get(port).unwrap() { + Polarity::Getter => branch.inbox.contains_key(port), + Polarity::Putter => branch.did_put.contains(port), + }; + let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + let did_fire = val == SpecVal::FIRING; + if did_fire != should_have_fired { + log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + // IMPLICIT inconsistency + drop((predicate, branch)); + return Ok(()); + } } // submit solution for this component let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); @@ -884,6 +901,7 @@ impl BranchingProtoComponent { drop((predicate, branch)); } else { // keep in "unblocked" + branch.did_put.insert(putter); log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); @@ -894,6 +912,16 @@ impl BranchingProtoComponent { Ok(()) }) } + fn branch_merge_func( + mut a: ProtoComponentBranch, + b: &mut ProtoComponentBranch, + ) -> ProtoComponentBranch { + if b.ended && !a.ended { + a.ended = true; + std::mem::swap(&mut a, b); + } + a + } fn feed_msg( &mut self, cu: &mut ConnectorUnphased, @@ -983,6 +1011,7 @@ impl BranchingProtoComponent { fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { inbox: Default::default(), + did_put: Default::default(), state, ended: false, untaken_choice: None, @@ -1123,6 +1152,19 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } + fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { + use std::collections::hash_map::Entry; + let e = self.swap.entry(k); + match e { + Entry::Vacant(ev) => { + ev.insert(v); + } + Entry::Occupied(mut eo) => { + let old = eo.get_mut(); + *old = func(v, old); + } + } + } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1185,7 +1227,7 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { ) -> Self { Self { input, inner: CyclicDrainInner { swap, output } } } - fn cylic_drain( + fn cyclic_drain( self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> { diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index e9ecf89454f5b6ca3920adb8ca5dd9fe2b840de2..0ec8e15677a52efb4c8473daec3baf1a21fd5ad2 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -23,11 +23,18 @@ fn next_test_addr() -> SocketAddr { SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() } fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { + file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone()) +} +fn file_logged_configured_connector( + connector_id: ConnectorId, + dir_path: &Path, + pd: Arc, +) -> Connector { let _ = std::fs::create_dir(dir_path); // we will check failure soon let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); let file = File::create(path).unwrap(); let file_logger = Box::new(FileLogger::new(connector_id, file)); - Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8) + Connector::new(file_logger, pd, connector_id, 8) } static MINIMAL_PDL: &'static [u8] = b" primitive together(in ia, in ib, out oa, out ob){ @@ -793,8 +800,8 @@ fn udp_reowolf_swap() { } #[test] -fn pres_3() { - let test_log_path = Path::new("./logs/pres_3"); +fn example_pres_3() { + let test_log_path = Path::new("./logs/example_pres_3"); let sock_addrs = [next_test_addr(), next_test_addr()]; scope(|s| { s.spawn(|_| { @@ -834,3 +841,42 @@ fn pres_3() { }) .unwrap(); } + +#[test] +fn ac_not_b() { + let test_log_path = Path::new("./logs/ac_not_b"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // "amy" + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + + // put both A and B + c.put(p0, TEST_MSG.clone()).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + }); + s.spawn(|_| { + // "bob" + let pdl = b" + primitive ac_not_b(in a, in b, out c){ + // forward A to C but keep B silent + synchronous{ put(c, get(a)); } + }"; + let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap()); + let mut c = file_logged_configured_connector(1, test_log_path, pd); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + let [a, b] = c.new_port_pair(); + c.add_component(b"ac_not_b", &[p0, p1, a]).unwrap(); + c.connect(SEC1).unwrap(); + + c.get(b).unwrap(); + c.sync(SEC1).unwrap_err(); + }); + }) + .unwrap(); +}