diff --git a/src/runtime_old/tests.rs b/src/runtime_old/tests.rs deleted file mode 100644 index b9cf20cd3d9a847e5d489335d4b310605476d26c..0000000000000000000000000000000000000000 --- a/src/runtime_old/tests.rs +++ /dev/null @@ -1,1491 +0,0 @@ -use crate as reowolf; -use crossbeam_utils::thread::scope; -use reowolf::{ - error::*, - EndpointPolarity::{Active, Passive}, - Polarity::{Getter, Putter}, - *, -}; -use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; -////////////////////////////////////////// -const MS100: Option = Some(Duration::from_millis(100)); -const MS300: Option = Some(Duration::from_millis(300)); -const SEC1: Option = Some(Duration::from_secs(1)); -const SEC5: Option = Some(Duration::from_secs(5)); -const SEC15: Option = Some(Duration::from_secs(15)); -fn next_test_addr() -> SocketAddr { - use std::{ - net::{Ipv4Addr, SocketAddrV4}, - sync::atomic::{AtomicU16, Ordering::SeqCst}, - }; - static TEST_PORT: AtomicU16 = AtomicU16::new(5_000); - let port = TEST_PORT.fetch_add(1, SeqCst); - SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() -} -fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { - file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone()) -} -fn file_logged_configured_connector( - connector_id: ConnectorId, - dir_path: &Path, - pd: Arc, -) -> Connector { - let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir"); - let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); - let file = File::create(path).expect("Failed to create log output file!"); - let file_logger = Box::new(FileLogger::new(connector_id, file)); - Connector::new(file_logger, pd, connector_id) -} -static MINIMAL_PDL: &'static [u8] = b" -primitive sync_component(in a, out b) { - while (true) { - sync { - if (fires(a) && fires(b)) { - msg x = get(a); - put(b, x); - } else { - assert(!fires(a) && !fires(b)); - } - } - } -} - -primitive together(in ia, in ib, out oa, out ob){ - while(true) sync { - if(fires(ia)) { - put(oa, get(ia)); - put(ob, get(ib)); - } - } -} -"; -lazy_static::lazy_static! { - static ref MINIMAL_PROTO: Arc = { - Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap()) - }; -} -static TEST_MSG_BYTES: &'static [u8] = b"hello"; -lazy_static::lazy_static! { - static ref TEST_MSG: Payload = { - Payload::from(TEST_MSG_BYTES) - }; -} -fn new_u8_buffer(cap: usize) -> Vec { - let mut v = Vec::with_capacity(cap); - // Safe! len will cover owned bytes in valid state - unsafe { v.set_len(cap) } - v -} -////////////////////////////////////////// - -#[test] -fn basic_connector() { - Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0); -} - -#[test] -fn basic_logged_connector() { - let test_log_path = Path::new("./logs/basic_logged_connector"); - file_logged_connector(0, test_log_path); -} - -#[test] -fn new_port_pair() { - let test_log_path = Path::new("./logs/new_port_pair"); - let mut c = file_logged_connector(0, test_log_path); - let [_, _] = c.new_port_pair(); - let [_, _] = c.new_port_pair(); -} - -#[test] -fn new_sync() { - let test_log_path = Path::new("./logs/new_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[i, o]).unwrap(); -} - -#[test] -fn new_net_port() { - let test_log_path = Path::new("./logs/new_net_port"); - let mut c = file_logged_connector(0, test_log_path); - let sock_addrs = [next_test_addr()]; - let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); -} - -#[test] -fn trivial_connect() { - let test_log_path = Path::new("./logs/trivial_connect"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); -} - -#[test] -fn single_node_connect() { - let test_log_path = Path::new("./logs/single_node_connect"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); -} - -#[test] -fn minimal_net_connect() { - let test_log_path = Path::new("./logs/minimal_net_connect"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn put_no_sync() { - let test_log_path = Path::new("./logs/put_no_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [o, _] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); -} - -#[test] -fn wrong_polarity_bad() { - let test_log_path = Path::new("./logs/wrong_polarity_bad"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(i, TEST_MSG.clone()).unwrap_err(); -} - -#[test] -fn dup_put_bad() { - let test_log_path = Path::new("./logs/dup_put_bad"); - let mut c = file_logged_connector(0, test_log_path); - let [o, _] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap_err(); -} - -#[test] -fn trivial_sync() { - let test_log_path = Path::new("./logs/trivial_sync"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn unconnected_gotten_err() { - let test_log_path = Path::new("./logs/unconnected_gotten_err"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); -} - -#[test] -fn connected_gotten_err_no_round() { - let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); -} - -#[test] -fn connected_gotten_err_ungotten() { - let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); - assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); -} - -#[test] -fn native_polarity_checks() { - let test_log_path = Path::new("./logs/native_polarity_checks"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - // fail... - c.get(o).unwrap_err(); - c.put(i, TEST_MSG.clone()).unwrap_err(); - // succeed.. - c.get(i).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); -} - -#[test] -fn native_multiple_gets() { - let test_log_path = Path::new("./logs/native_multiple_gets"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - c.get(i).unwrap_err(); -} - -#[test] -fn next_batch() { - let test_log_path = Path::new("./logs/next_batch"); - let mut c = file_logged_connector(0, test_log_path); - c.next_batch().unwrap_err(); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.next_batch().unwrap(); - c.next_batch().unwrap(); -} - -#[test] -fn native_self_msg() { - let test_log_path = Path::new("./logs/native_self_msg"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn two_natives_msg() { - let test_log_path = Path::new("./logs/two_natives_msg"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn trivial_nondet() { - let test_log_path = Path::new("./logs/trivial_nondet"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - // getting 0 batch - c.next_batch().unwrap(); - // silent 1 batch - assert_eq!(1, c.sync(SEC1).unwrap()); - c.gotten(i).unwrap_err(); -} - -#[test] -fn connector_pair_nondet() { - let test_log_path = Path::new("./logs/connector_pair_nondet"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.get(g).unwrap(); - assert_eq!(1, c.sync(SEC1).unwrap()); - c.gotten(g).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn native_immediately_inconsistent() { - let test_log_path = Path::new("./logs/native_immediately_inconsistent"); - let mut c = file_logged_connector(0, test_log_path); - let [_, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap_err(); -} - -#[test] -fn native_recovers() { - let test_log_path = Path::new("./logs/native_recovers"); - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap_err(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap(); -} - -#[test] -fn cannot_use_moved_ports() { - /* - native p|-->|g sync - */ - let test_log_path = Path::new("./logs/cannot_use_moved_ports"); - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[g, p]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap_err(); - c.get(g).unwrap_err(); -} - -#[test] -fn sync_sync() { - /* - native p0|-->|g0 sync - g1|<--|p1 - */ - let test_log_path = Path::new("./logs/sync_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[g0, p1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g1).unwrap(); -} - -#[test] -fn double_net_connect() { - let test_log_path = Path::new("./logs/double_net_connect"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let [_p, _g] = [ - c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), - ]; - c.connect(SEC1).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let [_g, _p] = [ - c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), - ]; - c.connect(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn distributed_msg_bounce() { - /* - native[0] | sync 0.p|-->|1.p native[1] - 0.g|<--|1.g - */ - let test_log_path = Path::new("./logs/distributed_msg_bounce"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - /* - native | sync p|--> - | g|<-- - */ - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = [ - c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), - ]; - c.add_component(b"", b"sync_component", &[g, p]).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); - }); - s.spawn(|_| { - /* - native p|--> - g|<-- - */ - let mut c = file_logged_connector(1, test_log_path); - let [g, p] = [ - c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), - ]; - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn local_timeout() { - let test_log_path = Path::new("./logs/local_timeout"); - let mut c = file_logged_connector(0, test_log_path); - let [_, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - match c.sync(MS300) { - Err(SyncError::RoundFailure) => {} - res => panic!("expeted timeout. but got {:?}", res), - } -} - -#[test] -fn parent_timeout() { - let test_log_path = Path::new("./logs/parent_timeout"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // parent; times out - let mut c = file_logged_connector(999, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(MS300).unwrap_err(); // timeout - }); - s.spawn(|_| { - // child - let mut c = file_logged_connector(000, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); // not matched by put - c.sync(None).unwrap_err(); // no timeout - }); - }) - .unwrap(); -} - -#[test] -fn child_timeout() { - let test_log_path = Path::new("./logs/child_timeout"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // child; times out - let mut c = file_logged_connector(000, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(MS300).unwrap_err(); // timeout - }); - s.spawn(|_| { - // parent - let mut c = file_logged_connector(999, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); // not matched by put - c.sync(None).unwrap_err(); // no timeout - }); - }) - .unwrap(); -} - -#[test] -fn chain_connect() { - let test_log_path = Path::new("./logs/chain_connect"); - let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(10, test_log_path); - c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - // LEADER - let mut c = file_logged_connector(7, test_log_path); - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(4, test_log_path); - c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); - c.connect(SEC5).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn net_self_loop() { - let test_log_path = Path::new("./logs/net_self_loop"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(MS300).unwrap(); -} - -#[test] -fn nobody_connects_active() { - let test_log_path = Path::new("./logs/nobody_connects_active"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(Some(Duration::from_secs(5))).unwrap_err(); -} -#[test] -fn nobody_connects_passive() { - let test_log_path = Path::new("./logs/nobody_connects_passive"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(Some(Duration::from_secs(5))).unwrap_err(); -} - -#[test] -fn together() { - let test_log_path = Path::new("./logs/together"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_port_pair(); - let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - let [p4, p5] = c.new_port_pair(); - c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p5).unwrap(); - c.sync(MS300).unwrap(); - c.gotten(p5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let [p0, p1] = c.new_port_pair(); - let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let [p4, p5] = c.new_port_pair(); - c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p5).unwrap(); - c.sync(MS300).unwrap(); - c.gotten(p5).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn native_batch_distinguish() { - let test_log_path = Path::new("./logs/native_batch_distinguish"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn multirounds() { - let test_log_path = Path::new("./logs/multirounds"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..10 { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..10 { - c.get(p0).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - }) - .unwrap(); -} - -#[test] -fn multi_recover() { - let test_log_path = Path::new("./logs/multi_recover"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let success_iter = [true, false].iter().copied().cycle().take(10); - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for succeeds in success_iter.clone() { - c.put(p0, TEST_MSG.clone()).unwrap(); - if succeeds { - c.get(p1).unwrap(); - } - let res = c.sync(MS300); - assert_eq!(res.is_ok(), succeeds); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - for succeeds in success_iter.clone() { - c.get(p0).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - let res = c.sync(MS300); - assert_eq!(res.is_ok(), succeeds); - } - }); - }) - .unwrap(); -} - -#[test] -fn udp_self_connect() { - let test_log_path = Path::new("./logs/udp_self_connect"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap(); - c.connect(SEC1).unwrap(); -} - -#[test] -fn solo_udp_put_success() { - let test_log_path = Path::new("./logs/solo_udp_put_success"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS300).unwrap(); -} - -#[test] -fn solo_udp_get_fail() { - let test_log_path = Path::new("./logs/solo_udp_get_fail"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.get(p0).unwrap(); - c.sync(MS300).unwrap_err(); -} - -#[ignore] -#[test] -fn reowolf_to_udp() { - let test_log_path = Path::new("./logs/reowolf_to_udp"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS300).unwrap(); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - let mut buf = new_u8_buffer(256); - let len = udp.recv(&mut buf).unwrap(); - assert_eq!(TEST_MSG_BYTES, &buf[0..len]); - barrier.wait(); - }); - }) - .unwrap(); -} - -#[ignore] -#[test] -fn udp_to_reowolf() { - let test_log_path = Path::new("./logs/udp_to_reowolf"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.get(p0).unwrap(); - c.sync(SEC5).unwrap(); - assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - for _ in 0..15 { - udp.send(TEST_MSG_BYTES).unwrap(); - std::thread::sleep(MS100.unwrap()); - } - barrier.wait(); - }); - }) - .unwrap(); -} - -#[test] -fn udp_reowolf_swap() { - let test_log_path = Path::new("./logs/udp_reowolf_swap"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC5).unwrap(); - assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - let mut buf = new_u8_buffer(256); - for _ in 0..5 { - std::thread::sleep(Duration::from_millis(60)); - udp.send(TEST_MSG_BYTES).unwrap(); - } - let len = udp.recv(&mut buf).unwrap(); - assert_eq!(TEST_MSG_BYTES, &buf[0..len]); - barrier.wait(); - }); - }) - .unwrap(); -} - -#[test] -fn example_pres_3() { - let test_log_path = Path::new("./logs/example_pres_3"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // "amy" - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - // put {A} and FAIL - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - // put {B} and FAIL - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - // put {A, B} and SUCCEED - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - s.spawn(|_| { - // "bob" - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..2 { - // get {A, B} and FAIL - c.get(p0).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap_err(); - } - // get {A, B} and SUCCEED - c.get(p0).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn ac_not_b() { - let test_log_path = Path::new("./logs/ac_not_b"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // "amy" - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC5).unwrap(); - - // put both A and B - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - }); - s.spawn(|_| { - // "bob" - let pdl = b" - primitive ac_not_b(in a, in b, out c){ - // forward A to C but keep B silent - sync { put(c, get(a)); } - }"; - let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap()); - let mut c = file_logged_configured_connector(1, test_log_path, pd); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - let [a, b] = c.new_port_pair(); - - c.add_component(b"", b"ac_not_b", &[p0, p1, a]).unwrap(); - - c.connect(SEC1).unwrap(); - - c.get(b).unwrap(); - c.sync(SEC1).unwrap_err(); - }); - }) - .unwrap(); -} - -#[test] -fn many_rounds_net() { - let test_log_path = Path::new("./logs/many_rounds_net"); - let sock_addrs = [next_test_addr()]; - const NUM_ROUNDS: usize = 1_000; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.get(p0).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - }) - .unwrap(); -} -#[test] -fn many_rounds_mem() { - let test_log_path = Path::new("./logs/many_rounds_mem"); - const NUM_ROUNDS: usize = 1_000; - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - } -} - -#[test] -fn pdl_reo_lossy() { - let pdl = b" - primitive lossy(in a, out b) { - while(true) sync { - msg m = null; - if(fires(a)) { - m = get(a); - if(fires(b)) { - put(b, m); - } - } - } - } - "; - reowolf::ProtocolDescription::parse(pdl).unwrap(); -} - -#[test] -fn pdl_reo_fifo1() { - let pdl = b" - primitive fifo1(in a, out b) { - msg m = null; - while(true) sync { - if(m == null) { - if(fires(a)) m=get(a); - } else { - if(fires(b)) put(b, m); - m = null; - } - } - } - "; - reowolf::ProtocolDescription::parse(pdl).unwrap(); -} - -#[test] -fn pdl_reo_fifo1full() { - let test_log_path = Path::new("./logs/pdl_reo_fifo1full"); - let pdl = b" - primitive fifo1full(in a, out b) { - bool is_set = true; - msg m = create(0); - while(true) sync { - if(!is_set) { - if(fires(a)) m=get(a); - is_set = false; - } else { - if(fires(b)) put(b, m); - is_set = true; - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - let [_p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"fifo1full", &[g0, p1]).unwrap(); - c.connect(None).unwrap(); - c.get(g1).unwrap(); - c.sync(None).unwrap(); - assert_eq!(0, c.gotten(g1).unwrap().len()); -} - -#[test] -fn pdl_msg_consensus() { - let test_log_path = Path::new("./logs/pdl_msg_consensus"); - let pdl = b" - primitive msgconsensus(in a, in b) { - while(true) sync { - msg x = get(a); - msg y = get(b); - assert(x == y); - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"msgconsensus", &[g0, g1]).unwrap(); - c.connect(None).unwrap(); - c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap(); - c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); - c.sync(SEC1).unwrap(); - - c.put(p0, Payload::from(b"HEY" as &[_])).unwrap(); - c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); - c.sync(SEC1).unwrap_err(); -} - -#[test] -fn sequencer3_prim() { - let test_log_path = Path::new("./logs/sequencer3_prim"); - let pdl = b" - primitive sequencer3(out a, out b, out c) { - u32 i = 0; - while(true) sync { - out to = a; - if (i==1) to = b; - else if(i==2) to = c; - if(fires(to)) { - put(to, create(0)); - i = (i + 1)%3; - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let which_of_three = move |c: &mut Connector| { - // setup three sync batches. sync. return which succeeded - c.get(g0).unwrap(); - c.next_batch().unwrap(); - c.get(g1).unwrap(); - c.next_batch().unwrap(); - c.get(g2).unwrap(); - c.sync(None).unwrap() - }; - - const TEST_ROUNDS: usize = 50; - // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] - for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - // silent round - assert_eq!(0, c.sync(None).unwrap()); - // non silent round - assert_eq!(expected_batch_idx, which_of_three(&mut c)); - } -} - -#[test] -fn sequencer3_comp() { - let test_log_path = Path::new("./logs/sequencer3_comp"); - let pdl = b" - primitive replicator(in a, out b, out c) { - while (true) { - sync { - if (fires(a) && fires(b) && fires(c)) { - msg x = get(a); - put(b, x); - put(c, x); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - primitive fifo1_init(bool has_value, T m, in a, out b) { - while(true) sync { - if(has_value && fires(b)) { - put(b, m); - has_value = false; - } else if (!has_value && fires(a)) { - m = get(a); - has_value = true; - } - } - } - composite fifo1_full(in a, out b) { - new fifo1_init(true, create(0), a, b); - } - composite fifo1(in a, out b) { - new fifo1_init(false, create(0), a, b); - } - composite sequencer3(out a, out b, out c) { - channel d -> e; - channel f -> g; - channel h -> i; - channel j -> k; - channel l -> m; - channel n -> o; - - new fifo1_full(o, d); - new replicator(e, f, a); - new fifo1(g, h); - new replicator(i, j, b); - new fifo1(k, l); - new replicator(m, n, c); - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let which_of_three = move |c: &mut Connector| { - // setup three sync batches. sync. return which succeeded - c.get(g0).unwrap(); - c.next_batch().unwrap(); - c.get(g1).unwrap(); - c.next_batch().unwrap(); - c.get(g2).unwrap(); - c.sync(SEC1).unwrap() - }; - - const TEST_ROUNDS: usize = 50; - // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] - for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - // silent round - assert_eq!(0, c.sync(SEC1).unwrap()); - // non silent round - assert_eq!(expected_batch_idx, which_of_three(&mut c)); - } -} - -enum XRouterItem { - Silent, - GetA, - GetB, -} -// Hardcoded pseudo-random sequence of round behaviors for the native component -const XROUTER_ITEMS: &[XRouterItem] = { - use XRouterItem::{GetA as A, GetB as B, Silent as S}; - &[ - B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B, - S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A, - A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S, - ] -}; - -#[test] -fn xrouter_prim() { - let test_log_path = Path::new("./logs/xrouter_prim"); - let pdl = b" - primitive xrouter(in a, out b, out c) { - while(true) sync { - if(fires(a)) { - if(fires(b)) put(b, get(a)); - else put(c, get(a)); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let now = std::time::Instant::now(); - for item in XROUTER_ITEMS.iter() { - match item { - XRouterItem::Silent => {} - XRouterItem::GetA => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - } - XRouterItem::GetB => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g2).unwrap(); - } - } - assert_eq!(0, c.sync(SEC1).unwrap()); - } - println!("PRIM {:?}", now.elapsed()); -} -#[test] -fn xrouter_comp() { - let test_log_path = Path::new("./logs/xrouter_comp"); - let pdl = b" - primitive replicator(in a, out b, out c) { - while (true) { - sync { - if (fires(a) && fires(b) && fires(c)) { - msg x = get(a); - put(b, x); - put(c, x); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - - primitive merger(in a, in b, out c) { - while (true) { - sync { - if (fires(a) && !fires(b) && fires(c)) { - put(c, get(a)); - } else if (!fires(a) && fires(b) && fires(c)) { - put(c, get(b)); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - - primitive lossy(in a, out b) { - while(true) sync { - if(fires(a)) { - auto m = get(a); - if(fires(b)) put(b, m); - } - } - } - primitive sync_drain(in a, in b) { - while(true) sync { - if(fires(a)) { - msg drop_it = get(a); - msg on_the_floor = get(b); - } - } - } - composite xrouter(in a, out b, out c) { - channel d -> e; - channel f -> g; - channel h -> i; - channel j -> k; - channel l -> m; - channel n -> o; - channel p -> q; - channel r -> s; - channel t -> u; - - new replicator(a, d, f); - new replicator(g, t, h); - new lossy(e, l); - new lossy(i, j); - new replicator(m, b, p); - new replicator(k, n, c); - new merger(q, o, r); - new sync_drain(u, s); - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let now = std::time::Instant::now(); - for item in XROUTER_ITEMS.iter() { - match item { - XRouterItem::Silent => {} - XRouterItem::GetA => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - } - XRouterItem::GetB => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g2).unwrap(); - } - } - assert_eq!(0, c.sync(SEC1).unwrap()); - } - println!("COMP {:?}", now.elapsed()); -} - -#[test] -fn count_stream() { - let test_log_path = Path::new("./logs/count_stream"); - let pdl = b" - primitive count_stream(out o) { - msg m = create(1); - m[0] = 0; - while(true) sync { - put(o, m); - m[0] += 1; - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - c.add_component(b"", b"count_stream", &[p0]).unwrap(); - c.connect(None).unwrap(); - - for expecting in 0u8..16 { - c.get(g0).unwrap(); - c.sync(None).unwrap(); - assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); - } -} - -#[test] -fn for_msg_byte() { - let test_log_path = Path::new("./logs/for_msg_byte"); - let pdl = b" - primitive for_msg_byte(out o) { - u8 i = 0; - u32 idx = 0; - while(i<8) { - msg m = create(1); - m[idx] = i; - sync put(o, m); - i += 1; - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - c.add_component(b"", b"for_msg_byte", &[p0]).unwrap(); - c.connect(None).unwrap(); - - for expecting in 0u8..8 { - c.get(g0).unwrap(); - c.sync(None).unwrap(); - assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); - } - c.sync(None).unwrap(); -} - -#[test] -fn eq_causality() { - let test_log_path = Path::new("./logs/eq_causality"); - let pdl = b" - primitive eq(in a, in b, out c) { - msg ma = create(0); - msg mb = create(0); - while(true) sync { - if(fires(a)) { - // b and c also fire! - // left first! - ma = get(a); - put(c, ma); - mb = get(b); - assert(ma == mb); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - /* - [native]p0-->g0[eq]p1--. - g1 | - ^---------` - */ - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); - - /* - V--------. - g2 | - [native]p2-->g3[eq]p3--` - */ - let [p2, g2] = c.new_port_pair(); - let [p3, g3] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); - c.connect(None).unwrap(); - - for _ in 0..4 { - // everything is fine with LEFT FIRST - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS100).unwrap(); - - // no solution when left is NOT FIRST - c.put(p2, TEST_MSG.clone()).unwrap(); - c.sync(MS100).unwrap_err(); - } -} - -#[test] -fn eq_no_causality() { - let test_log_path = Path::new("./logs/eq_no_causality"); - let pdl = b" - composite eq(in a, in b, out c) { - channel leftfirsto -> leftfirsti; - new eqinner(a, b, c, leftfirsto, leftfirsti); - } - primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) { - msg ma = create(0); - msg mb = create(0); - while(true) sync { - if(fires(a)) { - // b and c also fire! - if(fires(leftfirsti)) { - // left first! DO USE DUMMY - ma = get(a); - put(c, ma); - mb = get(b); - - // using dummy! - put(leftfirsto, ma); - auto drop_it = get(leftfirsti); - } else { - // right first! DON'T USE DUMMY - mb = get(b); - put(c, mb); - ma = get(a); - } - assert(ma == mb); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - /* - [native]p0-->g0[eq]p1--. - g1 | - ^---------` - */ - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); - - /* - V--------. - g2 | - [native]p2-->g3[eq]p3--` - */ - let [p2, g2] = c.new_port_pair(); - let [p3, g3] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); - c.connect(None).unwrap(); - - for _ in 0..32 { - // ok when they send - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p2, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - // ok when they don't - c.sync(SEC1).unwrap(); - } -}