diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 111b78c27c30c090af0255c18085ec52122ab3c5..40558cf2521a8f05239f3857864bc5f116a32709 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -8,6 +8,10 @@ use reowolf::{ }; use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; ////////////////////////////////////////// +const SEC1: Option = Some(Duration::from_secs(1)); +const SEC5: Option = Some(Duration::from_secs(5)); +const SEC15: Option = Some(Duration::from_secs(15)); +const MS300: Option = Some(Duration::from_millis(300)); fn next_test_addr() -> SocketAddr { use std::{ net::{Ipv4Addr, SocketAddrV4}, @@ -87,7 +91,7 @@ fn new_net_port() { fn trivial_connect() { let test_log_path = Path::new("./logs/trivial_connect"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); } #[test] @@ -97,7 +101,7 @@ fn single_node_connect() { let mut c = file_logged_connector(0, test_log_path); let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); } #[test] @@ -108,12 +112,12 @@ fn minimal_net_connect() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let _ = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); }) .unwrap(); @@ -124,7 +128,7 @@ fn put_no_sync() { let test_log_path = Path::new("./logs/put_no_sync"); let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); } @@ -133,7 +137,7 @@ fn wrong_polarity_bad() { let test_log_path = Path::new("./logs/wrong_polarity_bad"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(i, TEST_MSG.clone()).unwrap_err(); } @@ -142,7 +146,7 @@ fn dup_put_bad() { let test_log_path = Path::new("./logs/dup_put_bad"); let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); c.put(o, TEST_MSG.clone()).unwrap_err(); } @@ -151,8 +155,8 @@ fn dup_put_bad() { fn trivial_sync() { let test_log_path = Path::new("./logs/trivial_sync"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); } #[test] @@ -168,7 +172,7 @@ fn connected_gotten_err_no_round() { let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); } @@ -177,8 +181,8 @@ fn connected_gotten_err_ungotten() { let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); } @@ -187,7 +191,7 @@ fn native_polarity_checks() { let test_log_path = Path::new("./logs/native_polarity_checks"); let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); // fail... c.get(o).unwrap_err(); c.put(i, TEST_MSG.clone()).unwrap_err(); @@ -201,7 +205,7 @@ fn native_multiple_gets() { let test_log_path = Path::new("./logs/native_multiple_gets"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); c.get(i).unwrap_err(); } @@ -211,7 +215,7 @@ fn next_batch() { let test_log_path = Path::new("./logs/next_batch"); let mut c = file_logged_connector(0, test_log_path); c.next_batch().unwrap_err(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); @@ -222,10 +226,10 @@ fn native_self_msg() { let test_log_path = Path::new("./logs/native_self_msg"); let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); } #[test] @@ -236,17 +240,17 @@ fn two_natives_msg() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); }); }) .unwrap(); @@ -257,12 +261,12 @@ fn trivial_nondet() { let test_log_path = Path::new("./logs/trivial_nondet"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); // getting 0 batch c.next_batch().unwrap(); // silent 1 batch - assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); + assert_eq!(1, c.sync(SEC1).unwrap()); c.gotten(i).unwrap_err(); } @@ -274,18 +278,18 @@ fn connector_pair_nondet() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); c.get(g).unwrap(); - assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); + assert_eq!(1, c.sync(SEC1).unwrap()); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); }); }) .unwrap(); @@ -296,9 +300,9 @@ fn native_immediately_inconsistent() { let test_log_path = Path::new("./logs/native_immediately_inconsistent"); let mut c = file_logged_connector(0, test_log_path); let [_, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap_err(); + c.sync(SEC15).unwrap_err(); } #[test] @@ -306,12 +310,12 @@ fn native_recovers() { let test_log_path = Path::new("./logs/native_recovers"); let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap_err(); + c.sync(SEC15).unwrap_err(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap(); + c.sync(SEC15).unwrap(); } #[test] @@ -323,7 +327,7 @@ fn cannot_use_moved_ports() { let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap_err(); c.get(g).unwrap_err(); } @@ -339,10 +343,10 @@ fn sync_sync() { let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); c.add_component(b"sync", &[g0, p1]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(g1).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g1).unwrap(); } @@ -357,7 +361,7 @@ fn double_net_connect() { c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); @@ -365,7 +369,7 @@ fn double_net_connect() { c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); }) .unwrap(); @@ -391,8 +395,8 @@ fn distributed_msg_bounce() { c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); }); s.spawn(|_| { /* @@ -404,10 +408,10 @@ fn distributed_msg_bounce() { c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g).unwrap(); }); }) @@ -419,9 +423,9 @@ fn local_timeout() { let test_log_path = Path::new("./logs/local_timeout"); let mut c = file_logged_connector(0, test_log_path); let [_, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - match c.sync(Some(Duration::from_millis(200))) { + match c.sync(MS300) { Err(SyncError::RoundFailure) => {} res => panic!("expeted timeout. but got {:?}", res), } @@ -436,14 +440,14 @@ fn parent_timeout() { // parent; times out let mut c = file_logged_connector(999, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // child let mut c = file_logged_connector(000, test_log_path); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout }); @@ -460,14 +464,14 @@ fn child_timeout() { // child; times out let mut c = file_logged_connector(000, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // parent let mut c = file_logged_connector(999, test_log_path); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout }); @@ -483,31 +487,31 @@ fn chain_connect() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(10, test_log_path); c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { // LEADER let mut c = file_logged_connector(7, test_log_path); c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(4, test_log_path); c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); }) .unwrap(); @@ -520,10 +524,10 @@ fn net_self_loop() { let mut c = file_logged_connector(0, test_log_path); let p = c.new_net_port(Putter, sock_addr, Active).unwrap(); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); } #[test] @@ -555,10 +559,10 @@ fn together() { let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); let [p4, p5] = c.new_port_pair(); c.add_component(b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(p5).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); c.gotten(p5).unwrap(); }); s.spawn(|_| { @@ -568,10 +572,10 @@ fn together() { let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); let [p4, p5] = c.new_port_pair(); c.add_component(b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(p5).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); c.gotten(p5).unwrap(); }); }) @@ -582,7 +586,74 @@ fn together() { fn native_batch_distinguish() { let test_log_path = Path::new("./logs/native_batch_distinguish"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); - c.sync(Some(Duration::from_secs(3))).unwrap(); + c.sync(SEC1).unwrap(); +} + +#[test] +fn multirounds() { + let test_log_path = Path::new("./logs/multirounds"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn multi_recover() { + let test_log_path = Path::new("./logs/multi_recover"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let success_iter = [true, false].iter().copied().cycle().take(10); + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.put(p0, TEST_MSG.clone()).unwrap(); + if succeeds { + c.get(p1).unwrap(); + } + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + }) + .unwrap(); }