From a3c92705eeeee95c4f0a59d31cf2b6b07035070e 2020-07-14 11:39:53 From: Christopher Esterhuyse Date: 2020-07-14 11:39:53 Subject: [PATCH] bugfix: native component branch forks that clash are MERGED rather than overwritten. Avoids race condition where (1) branch x ends and submits a solution, (2) branch y is created, has same predicate as x and overwrites it, but has a subset of its messages, (3) round ends in success but branch x is gone, so no suitable native branch is found --- diff --git a/examples/eg_protocols.pdl b/examples/eg_protocols.pdl index 91123c5824cbe9055386b2fb4793a8b70ef16105..a545ea70afb988c8600f4cdf12f68d4e5f8c5749 100644 --- a/examples/eg_protocols.pdl +++ b/examples/eg_protocols.pdl @@ -1,8 +1,13 @@ +primitive pres_2(in i, out o) { + synchronous { + //put(o, get(i)); + } +} primitive together(in ia, in ib, out oa, out ob){ - while(true) synchronous() { + while(true) synchronous { if(fires(ia)) { put(oa, get(ia)); put(ob, get(ib)); } } -} \ No newline at end of file +} diff --git a/examples/pres_1/amy.c b/examples/pres_1/amy.c index 094d3f0765a40b15029345cfc54c03cd70db8a62..c49a14f216856ddbcaa991388b02c88cf466cb45 100644 --- a/examples/pres_1/amy.c +++ b/examples/pres_1/amy.c @@ -1,35 +1,37 @@ + #include "../../reowolf.h" #include "../utility.c" + + int main(int argc, char** argv) { - // Create a connector... + char msgbuf[64]; + // ask user what message to send + size_t msglen = get_user_msg(msgbuf, sizeof(msgbuf)); + + // Create a connector, configured with our (trivial) protocol. Arc_ProtocolDescription * pd = protocol_description_parse("", 0); char logpath[] = "./pres_1_amy.txt"; Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + rw_err_peek(c); // ... with 1 outgoing network connection PortId p0; char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port( - c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Active); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1, + Polarity_Putter, EndpointPolarity_Passive); + rw_err_peek(c); - // Connect! Begin communication. 5000ms timeout + // Connect with peers (5000ms timeout). connector_connect(c, 5000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + rw_err_peek(c); - // Ask to receive a message... - connector_get(c, p0); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + // Prepare a message to send + connector_put_bytes(c, p0, msgbuf, msglen); + rw_err_peek(c); - // ... or timeout within 1000ms. + // ... reach new consistent state within 1000ms deadline. connector_sync(c, 1000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); - - // Print the message we received - size_t msg_len; - const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len); - printf("Got msg `%.*s`\n", msg_len, msg_ptr); + rw_err_peek(c); printf("Exiting\n"); protocol_description_destroy(pd); diff --git a/examples/pres_1/bob.c b/examples/pres_1/bob.c index fcc81fb8a3f2971c1e6604c77bf7217721119bb4..dbaee3b8c850fa1d6ca9f96561091ba36a09280d 100644 --- a/examples/pres_1/bob.c +++ b/examples/pres_1/bob.c @@ -2,31 +2,37 @@ #include "../../reowolf.h" #include "../utility.c" + int main(int argc, char** argv) { - // Create a connector... + // Create a connector, configured with our (trivial) protocol. Arc_ProtocolDescription * pd = protocol_description_parse("", 0); char logpath[] = "./pres_1_bob.txt"; Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + rw_err_peek(c); // ... with 1 outgoing network connection PortId p0; char addr_str[] = "127.0.0.1:8000"; - connector_add_net_port( - c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Passive); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_add_net_port(c, &p0, addr_str, sizeof(addr_str)-1, + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); - // Connect (5000ms timeout). Begin communication. + // Connect with peers (5000ms timeout). connector_connect(c, 5000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + rw_err_peek(c); - // Send a single 2-byte message... - connector_put_bytes(c, p0, "hi", 2); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + // Prepare to receive a message. + connector_get(c, p0); + rw_err_peek(c); - // ... and acknowledge receipt within 1000ms. + // ... reach new consistent state within 1000ms deadline. connector_sync(c, 1000); - printf("Error str `%s`\n", reowolf_error_peek(NULL)); + rw_err_peek(c); + + // Read our received message + size_t msg_len; + const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len); + printf("Got msg `%.*s`\n", msg_len, msg_ptr); printf("Exiting\n"); protocol_description_destroy(pd); diff --git a/examples/pres_2/bob.c b/examples/pres_2/bob.c new file mode 100644 index 0000000000000000000000000000000000000000..686c3fa016698fdb9622eae856799bf0890a84d3 --- /dev/null +++ b/examples/pres_2/bob.c @@ -0,0 +1,46 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with a protocol defined in a file + char * pdl = buffer_pdl("./eg_protocols.pdl"); + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl)); + char logpath[] = "./pres_2_bob.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 1 outgoing network connection + PortId ports[3]; + char addr_str[] = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr_str, sizeof(addr_str)-1, + Polarity_Getter, EndpointPolarity_Active); + connector_add_port_pair(c, &ports[1], &ports[2]); + connector_add_component(c, "pres_2", 6, ports, 2); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + // Prepare to receive a message. + connector_get(c, ports[2]); + rw_err_peek(c); + + // ... reach new consistent state within 1000ms deadline. + connector_sync(c, 1000); + rw_err_peek(c); + + // Read our received message + size_t msg_len; + const char * msg_ptr = connector_gotten_bytes(c, ports[2], &msg_len); + printf("Got msg `%.*s`\n", msg_len, msg_ptr); + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(pdl); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/pres_3/amy.c b/examples/pres_3/amy.c new file mode 100644 index 0000000000000000000000000000000000000000..2585e1c9a36f14f75eba56ba722c5e1a4a82a84a --- /dev/null +++ b/examples/pres_3/amy.c @@ -0,0 +1,49 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with our (trivial) protocol. + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./pres_3_amy.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 2 outgoing network connections + PortId ports[2]; + char * addr = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr, strlen(addr), + Polarity_Putter, EndpointPolarity_Passive); + rw_err_peek(c); + addr = "127.0.0.1:8001"; + connector_add_net_port(c, &ports[1], addr, strlen(addr), + Polarity_Putter, EndpointPolarity_Passive); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + printf("\nputting {A}...\n"); + connector_put_bytes(c, ports[0], "A", 1); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("\nputting {B}...\n"); + connector_put_bytes(c, ports[1], "B", 1); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("\nputting {A, B}...\n"); + connector_put_bytes(c, ports[0], "A", 1); + connector_put_bytes(c, ports[1], "B", 1); + connector_sync(c, 1000); + rw_err_peek(c); + + printf("\nExiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/pres_3/bob.c b/examples/pres_3/bob.c new file mode 100644 index 0000000000000000000000000000000000000000..4203808777b30fa8269c27b5836815f28a4b9d3b --- /dev/null +++ b/examples/pres_3/bob.c @@ -0,0 +1,43 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with our (trivial) protocol. + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./pres_3_bob.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 2 outgoing network connections + PortId ports[2]; + char * addr = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); + addr = "127.0.0.1:8001"; + connector_add_net_port(c, &ports[1], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + for(int i=0; i<3; i++) { + printf("\nGetting from both...\n"); + connector_get(c, ports[0]); + rw_err_peek(c); + connector_get(c, ports[1]); + rw_err_peek(c); + connector_sync(c, 1000); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/pres_4/bob.c b/examples/pres_4/bob.c new file mode 100644 index 0000000000000000000000000000000000000000..3bfad00cabdb4434bcdb5b86fb34465c3c22fe5d --- /dev/null +++ b/examples/pres_4/bob.c @@ -0,0 +1,53 @@ + +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // Create a connector, configured with our (trivial) protocol. + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./pres_3_bob.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + // ... with 2 outgoing network connections + PortId ports[2]; + char * addr = "127.0.0.1:8000"; + connector_add_net_port(c, &ports[0], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); + addr = "127.0.0.1:8001"; + connector_add_net_port(c, &ports[1], addr, strlen(addr), + Polarity_Getter, EndpointPolarity_Active); + rw_err_peek(c); + + // Connect with peers (5000ms timeout). + connector_connect(c, 5000); + rw_err_peek(c); + + for(int i=0; i<3; i++) { + printf("\nNext round...\n"); + printf("\nOption 0: Get {A}\n"); + connector_get(c, ports[0]); + connector_next_batch(c); + rw_err_peek(c); + + printf("\nOption 1: Get {B}\n"); + connector_get(c, ports[1]); + connector_next_batch(c); + rw_err_peek(c); + + printf("\nOption 2: Get {A, B}\n"); + connector_get(c, ports[0]); + connector_get(c, ports[1]); + int code = connector_sync(c, 1000); + printf("Outcome: %d\n", code); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/utility.c b/examples/utility.c index a43f2b02b6b0acd6a91eb4d8fb60219658e46230..a16e765433806b8f8d150e9d6792d5436523d3f5 100644 --- a/examples/utility.c +++ b/examples/utility.c @@ -3,6 +3,20 @@ #include #include #include +#include "../reowolf.h" + +size_t get_user_msg(char * buf, size_t cap) { + memset(buf, 0, cap); + printf("Insert a msg of max len %d: ", cap); + fgets(buf, cap, stdin); + for(size_t len = 0; len (ConnectorId, U32Suffix); +} pub type ConnectorId = u32; -pub type PortSuffix = u32; +pub type U32Suffix = u32; #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] @@ -39,7 +42,7 @@ pub struct ProtoComponentId(Id); #[repr(C)] pub struct Id { pub(crate) connector_id: ConnectorId, - pub(crate) u32_suffix: PortSuffix, + pub(crate) u32_suffix: U32Suffix, } #[derive(Clone, Debug, Default)] pub struct U32Stream { @@ -86,6 +89,21 @@ pub(crate) enum SyncBlocker { pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); ///////////////////// IMPL ///////////////////// +impl IdParts for Id { + fn id_parts(self) -> (ConnectorId, U32Suffix) { + (self.connector_id, self.u32_suffix) + } +} +impl IdParts for PortId { + fn id_parts(self) -> (ConnectorId, U32Suffix) { + self.0.id_parts() + } +} +impl IdParts for ProtoComponentId { + fn id_parts(self) -> (ConnectorId, U32Suffix) { + self.0.id_parts() + } +} impl U32Stream { pub(crate) fn next(&mut self) -> u32 { if self.next == u32::MAX { @@ -158,12 +176,14 @@ impl From> for Payload { } impl Debug for PortId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix) + let (a, b) = self.id_parts(); + write!(f, "pid{}_{}", a, b) } } impl Debug for ProtoComponentId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix) + let (a, b) = self.id_parts(); + write!(f, "cid{}_{}", a, b) } } impl Debug for Payload { diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 5da3932760a828cd85ded7b00e1b226c124bfadc..c1c11833fbae0265f25ea9ac3732636034a890d4 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -377,6 +377,7 @@ pub unsafe extern "C" fn connector_put_bytes( #[no_mangle] pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> c_int { + StoredError::tl_clear(); match connector.get(port) { Ok(()) => ERR_OK, Err(err) => { @@ -388,6 +389,7 @@ pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) #[no_mangle] pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize { + StoredError::tl_clear(); match connector.next_batch() { Ok(n) => n as isize, Err(err) => { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index ef47ff4bc174db53c47c3e8adf49418dd8174aac..31586d5a65d54f78dc6017ee21bddfd53f0969d1 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -19,7 +19,7 @@ struct BranchingNative { struct NativeBranch { index: usize, gotten: HashMap, - to_get: HashSet, // native branch is ended iff to_get.is_empty() + to_get: HashSet, } #[derive(Debug)] struct SolutionStorage { @@ -248,6 +248,11 @@ impl Connector { .iter() .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); + log!( + cu.logger, + "Children in subtree are: {:?}", + subtree_id_iter.clone().collect::>() + ); SolutionStorage::new(subtree_id_iter) }, spec_var_stream: cu.id_manager.new_spec_var_stream(), @@ -296,8 +301,8 @@ impl Connector { log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } - if to_get.is_empty() { - // this branch is immediately ready to be part of a solution + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if branch.is_ended() { log!( cu.logger, "Native submitting solution for batch {} with {:?}", @@ -310,7 +315,6 @@ impl Connector { predicate.clone(), ); } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; if let Some(_) = branching_native.branches.insert(predicate, branch) { // thanks to the native_branch_spec_var, each batch has a distinct predicate unreachable!() @@ -649,6 +653,11 @@ impl Connector { comm.endpoint_manager.send_to_comms(parent, &msg) } } +impl NativeBranch { + fn is_ended(&self) -> bool { + self.to_get.is_empty() + } +} impl BranchingNative { fn feed_msg( &mut self, @@ -667,13 +676,13 @@ impl BranchingNative { // check if this branch expects to receive it let var = cu.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { + branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); - branch.to_get.remove(&getter); - if branch.to_get.is_empty() { + if branch.is_ended() { log!( cu.logger, - "new native solution with {:?} (to_get.is_empty()) with gotten {:?}", + "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); @@ -683,6 +692,13 @@ impl BranchingNative { subtree_id, predicate.clone(), ); + } else { + log!( + cu.logger, + "Fed native {:?} still has to_get {:?}", + &predicate, + &branch.to_get + ); } }; if predicate.query(var) != Some(SpecVal::FIRING) { @@ -692,7 +708,7 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); continue; } use AssignmentUnionResult as Aur; @@ -704,13 +720,13 @@ impl BranchingNative { "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); - finished.insert(predicate, branch); + Self::fold_into(finished, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched @@ -723,8 +739,8 @@ impl BranchingNative { &predicate2, &predicate ); - finished.insert(predicate, branch); - finished.insert(predicate2, branch2); + Self::fold_into(finished, predicate, branch); + Self::fold_into(finished, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched @@ -735,11 +751,33 @@ impl BranchingNative { "new subsuming pred created {:?}. forking and feeding", &predicate2 ); - finished.insert(predicate, branch); - finished.insert(predicate2, branch2); + Self::fold_into(finished, predicate, branch); + Self::fold_into(finished, predicate2, branch2); + } + } + } + } + fn fold_into( + branches: &mut HashMap, + predicate: Predicate, + mut branch: NativeBranch, + ) { + let e = branches.entry(predicate); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + ev.insert(branch); + } + Entry::Occupied(mut eo) => { + let b = eo.get_mut(); + for (k, v) in branch.gotten.drain() { + if b.gotten.insert(k, v).is_none() { + b.to_get.remove(&k); + } } } } + // if let Some(prev) = branches.insert(predicate, branch) } fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { log!( @@ -749,7 +787,14 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) { + log!( + logger, + "Considering native branch {:?} with to_get {:?} gotten {:?}", + &branch_predicate, + &branch.to_get, + &branch.gotten + ); + if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; diff --git a/src/runtime/logging.rs b/src/runtime/logging.rs index 40fee3e2aa9fc159f3b4e27ce10d9afc8643a3b9..cdbe2b3611125add0c82d5ce7da8990742b8f0a3 100644 --- a/src/runtime/logging.rs +++ b/src/runtime/logging.rs @@ -1,5 +1,11 @@ use super::*; +fn secs_since_unix_epoch() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|dur| dur.as_secs_f64()) + .unwrap_or(0.) +} impl FileLogger { pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self { Self(connector_id, file) @@ -16,15 +22,16 @@ impl Logger for DummyLogger { None } } + impl Logger for VecLogger { fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> { - let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now()); + let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch()); Some(self) } } impl Logger for FileLogger { fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> { - let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now()); + let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch()); Some(&mut self.1) } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index f31b929d6d20bdb1ea4e4076d88feb613848c9b4..cbcce7fdc9a26443da268cd3de30c6022c456516 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -351,10 +351,12 @@ impl IdManager { } } fn new_spec_var_stream(&self) -> SpecVarStream { - SpecVarStream { - connector_id: self.connector_id, - port_suffix_stream: self.port_suffix_stream.clone(), + let mut port_suffix_stream = self.port_suffix_stream.clone(); + const JUMP_OVER: usize = 100; // Jumping is entirely unnecessary. It's only used to make spec vars easier to spot in logs + for _ in 0..JUMP_OVER { + port_suffix_stream.next(); // throw away an ID } + SpecVarStream { connector_id: self.connector_id, port_suffix_stream } } fn new_port_id(&mut self) -> PortId { Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() @@ -557,7 +559,13 @@ impl Debug for VecSet { } impl Debug for Predicate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_tuple("Predicate").field(&self.assigned).finish() + struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); + impl Debug for Assignment<'_> { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}={:?}", (self.0).0, (self.0).1) + } + } + f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() } } impl serde::Serialize for SerdeProtocolDescription { @@ -578,9 +586,15 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription { Ok(Self(Arc::new(inner))) } } +impl IdParts for SpecVar { + fn id_parts(self) -> (ConnectorId, U32Suffix) { + self.0.id_parts() + } +} impl Debug for SpecVar { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_tuple("vrID").field(&self.0).finish() + let (a, b) = self.id_parts(); + write!(f, "v{}_{}", a, b) } } impl SpecVal { diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 835cfb7fcd3befba5b3fa9881657b9fcf44a9765..e9ecf89454f5b6ca3920adb8ca5dd9fe2b840de2 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -791,3 +791,46 @@ fn udp_reowolf_swap() { }) .unwrap(); } + +#[test] +fn pres_3() { + let test_log_path = Path::new("./logs/pres_3"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // "amy" + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + // put {A} and FAIL + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + // put {B} and FAIL + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + // put {A, B} and SUCCEED + c.put(p0, TEST_MSG.clone()).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + }); + s.spawn(|_| { + // "bob" + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..2 { + // get {A, B} and FAIL + c.get(p0).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap_err(); + } + // get {A, B} and SUCCEED + c.get(p0).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + }); + }) + .unwrap(); +}