diff --git a/src/runtime_old/tests.rs b/src/runtime_old/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..b9cf20cd3d9a847e5d489335d4b310605476d26c --- /dev/null +++ b/src/runtime_old/tests.rs @@ -0,0 +1,1491 @@ +use crate as reowolf; +use crossbeam_utils::thread::scope; +use reowolf::{ + error::*, + EndpointPolarity::{Active, Passive}, + Polarity::{Getter, Putter}, + *, +}; +use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; +////////////////////////////////////////// +const MS100: Option = Some(Duration::from_millis(100)); +const MS300: Option = Some(Duration::from_millis(300)); +const SEC1: Option = Some(Duration::from_secs(1)); +const SEC5: Option = Some(Duration::from_secs(5)); +const SEC15: Option = Some(Duration::from_secs(15)); +fn next_test_addr() -> SocketAddr { + use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::atomic::{AtomicU16, Ordering::SeqCst}, + }; + static TEST_PORT: AtomicU16 = AtomicU16::new(5_000); + let port = TEST_PORT.fetch_add(1, SeqCst); + SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() +} +fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { + file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone()) +} +fn file_logged_configured_connector( + connector_id: ConnectorId, + dir_path: &Path, + pd: Arc, +) -> Connector { + let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir"); + let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); + let file = File::create(path).expect("Failed to create log output file!"); + let file_logger = Box::new(FileLogger::new(connector_id, file)); + Connector::new(file_logger, pd, connector_id) +} +static MINIMAL_PDL: &'static [u8] = b" +primitive sync_component(in a, out b) { + while (true) { + sync { + if (fires(a) && fires(b)) { + msg x = get(a); + put(b, x); + } else { + assert(!fires(a) && !fires(b)); + } + } + } +} + +primitive together(in ia, in ib, out oa, out ob){ + while(true) sync { + if(fires(ia)) { + put(oa, get(ia)); + put(ob, get(ib)); + } + } +} +"; +lazy_static::lazy_static! { + static ref MINIMAL_PROTO: Arc = { + Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap()) + }; +} +static TEST_MSG_BYTES: &'static [u8] = b"hello"; +lazy_static::lazy_static! { + static ref TEST_MSG: Payload = { + Payload::from(TEST_MSG_BYTES) + }; +} +fn new_u8_buffer(cap: usize) -> Vec { + let mut v = Vec::with_capacity(cap); + // Safe! len will cover owned bytes in valid state + unsafe { v.set_len(cap) } + v +} +////////////////////////////////////////// + +#[test] +fn basic_connector() { + Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0); +} + +#[test] +fn basic_logged_connector() { + let test_log_path = Path::new("./logs/basic_logged_connector"); + file_logged_connector(0, test_log_path); +} + +#[test] +fn new_port_pair() { + let test_log_path = Path::new("./logs/new_port_pair"); + let mut c = file_logged_connector(0, test_log_path); + let [_, _] = c.new_port_pair(); + let [_, _] = c.new_port_pair(); +} + +#[test] +fn new_sync() { + let test_log_path = Path::new("./logs/new_sync"); + let mut c = file_logged_connector(0, test_log_path); + let [o, i] = c.new_port_pair(); + c.add_component(b"", b"sync_component", &[i, o]).unwrap(); +} + +#[test] +fn new_net_port() { + let test_log_path = Path::new("./logs/new_net_port"); + let mut c = file_logged_connector(0, test_log_path); + let sock_addrs = [next_test_addr()]; + let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); +} + +#[test] +fn trivial_connect() { + let test_log_path = Path::new("./logs/trivial_connect"); + let mut c = file_logged_connector(0, test_log_path); + c.connect(SEC1).unwrap(); +} + +#[test] +fn single_node_connect() { + let test_log_path = Path::new("./logs/single_node_connect"); + let sock_addrs = [next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); +} + +#[test] +fn minimal_net_connect() { + let test_log_path = Path::new("./logs/minimal_net_connect"); + let sock_addrs = [next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn put_no_sync() { + let test_log_path = Path::new("./logs/put_no_sync"); + let mut c = file_logged_connector(0, test_log_path); + let [o, _] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.put(o, TEST_MSG.clone()).unwrap(); +} + +#[test] +fn wrong_polarity_bad() { + let test_log_path = Path::new("./logs/wrong_polarity_bad"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.put(i, TEST_MSG.clone()).unwrap_err(); +} + +#[test] +fn dup_put_bad() { + let test_log_path = Path::new("./logs/dup_put_bad"); + let mut c = file_logged_connector(0, test_log_path); + let [o, _] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.put(o, TEST_MSG.clone()).unwrap(); + c.put(o, TEST_MSG.clone()).unwrap_err(); +} + +#[test] +fn trivial_sync() { + let test_log_path = Path::new("./logs/trivial_sync"); + let mut c = file_logged_connector(0, test_log_path); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); +} + +#[test] +fn unconnected_gotten_err() { + let test_log_path = Path::new("./logs/unconnected_gotten_err"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); +} + +#[test] +fn connected_gotten_err_no_round() { + let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); +} + +#[test] +fn connected_gotten_err_ungotten() { + let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); + assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); +} + +#[test] +fn native_polarity_checks() { + let test_log_path = Path::new("./logs/native_polarity_checks"); + let mut c = file_logged_connector(0, test_log_path); + let [o, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + // fail... + c.get(o).unwrap_err(); + c.put(i, TEST_MSG.clone()).unwrap_err(); + // succeed.. + c.get(i).unwrap(); + c.put(o, TEST_MSG.clone()).unwrap(); +} + +#[test] +fn native_multiple_gets() { + let test_log_path = Path::new("./logs/native_multiple_gets"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(i).unwrap(); + c.get(i).unwrap_err(); +} + +#[test] +fn next_batch() { + let test_log_path = Path::new("./logs/next_batch"); + let mut c = file_logged_connector(0, test_log_path); + c.next_batch().unwrap_err(); + c.connect(SEC1).unwrap(); + c.next_batch().unwrap(); + c.next_batch().unwrap(); + c.next_batch().unwrap(); +} + +#[test] +fn native_self_msg() { + let test_log_path = Path::new("./logs/native_self_msg"); + let mut c = file_logged_connector(0, test_log_path); + let [o, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(i).unwrap(); + c.put(o, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); +} + +#[test] +fn two_natives_msg() { + let test_log_path = Path::new("./logs/two_natives_msg"); + let sock_addrs = [next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); + c.sync(SEC1).unwrap(); + c.gotten(g).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn trivial_nondet() { + let test_log_path = Path::new("./logs/trivial_nondet"); + let mut c = file_logged_connector(0, test_log_path); + let [_, i] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(i).unwrap(); + // getting 0 batch + c.next_batch().unwrap(); + // silent 1 batch + assert_eq!(1, c.sync(SEC1).unwrap()); + c.gotten(i).unwrap_err(); +} + +#[test] +fn connector_pair_nondet() { + let test_log_path = Path::new("./logs/connector_pair_nondet"); + let sock_addrs = [next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + c.next_batch().unwrap(); + c.get(g).unwrap(); + assert_eq!(1, c.sync(SEC1).unwrap()); + c.gotten(g).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn native_immediately_inconsistent() { + let test_log_path = Path::new("./logs/native_immediately_inconsistent"); + let mut c = file_logged_connector(0, test_log_path); + let [_, g] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); + c.sync(SEC15).unwrap_err(); +} + +#[test] +fn native_recovers() { + let test_log_path = Path::new("./logs/native_recovers"); + let mut c = file_logged_connector(0, test_log_path); + let [p, g] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); + c.sync(SEC15).unwrap_err(); + c.put(p, TEST_MSG.clone()).unwrap(); + c.get(g).unwrap(); + c.sync(SEC15).unwrap(); +} + +#[test] +fn cannot_use_moved_ports() { + /* + native p|-->|g sync + */ + let test_log_path = Path::new("./logs/cannot_use_moved_ports"); + let mut c = file_logged_connector(0, test_log_path); + let [p, g] = c.new_port_pair(); + c.add_component(b"", b"sync_component", &[g, p]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p, TEST_MSG.clone()).unwrap_err(); + c.get(g).unwrap_err(); +} + +#[test] +fn sync_sync() { + /* + native p0|-->|g0 sync + g1|<--|p1 + */ + let test_log_path = Path::new("./logs/sync_sync"); + let mut c = file_logged_connector(0, test_log_path); + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"", b"sync_component", &[g0, p1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g1).unwrap(); + c.sync(SEC1).unwrap(); + c.gotten(g1).unwrap(); +} + +#[test] +fn double_net_connect() { + let test_log_path = Path::new("./logs/double_net_connect"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let [_p, _g] = [ + c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), + ]; + c.connect(SEC1).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let [_g, _p] = [ + c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), + ]; + c.connect(SEC1).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn distributed_msg_bounce() { + /* + native[0] | sync 0.p|-->|1.p native[1] + 0.g|<--|1.g + */ + let test_log_path = Path::new("./logs/distributed_msg_bounce"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + /* + native | sync p|--> + | g|<-- + */ + let mut c = file_logged_connector(0, test_log_path); + let [p, g] = [ + c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), + ]; + c.add_component(b"", b"sync_component", &[g, p]).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); + }); + s.spawn(|_| { + /* + native p|--> + g|<-- + */ + let mut c = file_logged_connector(1, test_log_path); + let [g, p] = [ + c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), + ]; + c.connect(SEC1).unwrap(); + c.put(p, TEST_MSG.clone()).unwrap(); + c.get(g).unwrap(); + c.sync(SEC1).unwrap(); + c.gotten(g).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn local_timeout() { + let test_log_path = Path::new("./logs/local_timeout"); + let mut c = file_logged_connector(0, test_log_path); + let [_, g] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); + match c.sync(MS300) { + Err(SyncError::RoundFailure) => {} + res => panic!("expeted timeout. but got {:?}", res), + } +} + +#[test] +fn parent_timeout() { + let test_log_path = Path::new("./logs/parent_timeout"); + let sock_addrs = [next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // parent; times out + let mut c = file_logged_connector(999, test_log_path); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout + }); + s.spawn(|_| { + // child + let mut c = file_logged_connector(000, test_log_path); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); // not matched by put + c.sync(None).unwrap_err(); // no timeout + }); + }) + .unwrap(); +} + +#[test] +fn child_timeout() { + let test_log_path = Path::new("./logs/child_timeout"); + let sock_addrs = [next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // child; times out + let mut c = file_logged_connector(000, test_log_path); + let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout + }); + s.spawn(|_| { + // parent + let mut c = file_logged_connector(999, test_log_path); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + c.get(g).unwrap(); // not matched by put + c.sync(None).unwrap_err(); // no timeout + }); + }) + .unwrap(); +} + +#[test] +fn chain_connect() { + let test_log_path = Path::new("./logs/chain_connect"); + let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC5).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(10, test_log_path); + c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC5).unwrap(); + }); + s.spawn(|_| { + // LEADER + let mut c = file_logged_connector(7, test_log_path); + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); + c.connect(SEC5).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(4, test_log_path); + c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); + c.connect(SEC5).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); + c.connect(SEC5).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn net_self_loop() { + let test_log_path = Path::new("./logs/net_self_loop"); + let sock_addrs = [next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p, TEST_MSG.clone()).unwrap(); + c.get(g).unwrap(); + c.sync(MS300).unwrap(); +} + +#[test] +fn nobody_connects_active() { + let test_log_path = Path::new("./logs/nobody_connects_active"); + let sock_addrs = [next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.connect(Some(Duration::from_secs(5))).unwrap_err(); +} +#[test] +fn nobody_connects_passive() { + let test_log_path = Path::new("./logs/nobody_connects_passive"); + let sock_addrs = [next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + c.connect(Some(Duration::from_secs(5))).unwrap_err(); +} + +#[test] +fn together() { + let test_log_path = Path::new("./logs/together"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let [p0, p1] = c.new_port_pair(); + let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + let [p4, p5] = c.new_port_pair(); + c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p5).unwrap(); + c.sync(MS300).unwrap(); + c.gotten(p5).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let [p0, p1] = c.new_port_pair(); + let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let [p4, p5] = c.new_port_pair(); + c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p5).unwrap(); + c.sync(MS300).unwrap(); + c.gotten(p5).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn native_batch_distinguish() { + let test_log_path = Path::new("./logs/native_batch_distinguish"); + let mut c = file_logged_connector(0, test_log_path); + c.connect(SEC1).unwrap(); + c.next_batch().unwrap(); + c.sync(SEC1).unwrap(); +} + +#[test] +fn multirounds() { + let test_log_path = Path::new("./logs/multirounds"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn multi_recover() { + let test_log_path = Path::new("./logs/multi_recover"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let success_iter = [true, false].iter().copied().cycle().take(10); + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.put(p0, TEST_MSG.clone()).unwrap(); + if succeeds { + c.get(p1).unwrap(); + } + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + }) + .unwrap(); +} + +#[test] +fn udp_self_connect() { + let test_log_path = Path::new("./logs/udp_self_connect"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap(); + c.connect(SEC1).unwrap(); +} + +#[test] +fn solo_udp_put_success() { + let test_log_path = Path::new("./logs/solo_udp_put_success"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(MS300).unwrap(); +} + +#[test] +fn solo_udp_get_fail() { + let test_log_path = Path::new("./logs/solo_udp_get_fail"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.get(p0).unwrap(); + c.sync(MS300).unwrap_err(); +} + +#[ignore] +#[test] +fn reowolf_to_udp() { + let test_log_path = Path::new("./logs/reowolf_to_udp"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let barrier = std::sync::Barrier::new(2); + scope(|s| { + s.spawn(|_| { + barrier.wait(); + // reowolf thread + let mut c = file_logged_connector(0, test_log_path); + let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(MS300).unwrap(); + barrier.wait(); + }); + s.spawn(|_| { + barrier.wait(); + // udp thread + let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); + udp.connect(sock_addrs[0]).unwrap(); + let mut buf = new_u8_buffer(256); + let len = udp.recv(&mut buf).unwrap(); + assert_eq!(TEST_MSG_BYTES, &buf[0..len]); + barrier.wait(); + }); + }) + .unwrap(); +} + +#[ignore] +#[test] +fn udp_to_reowolf() { + let test_log_path = Path::new("./logs/udp_to_reowolf"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let barrier = std::sync::Barrier::new(2); + scope(|s| { + s.spawn(|_| { + barrier.wait(); + // reowolf thread + let mut c = file_logged_connector(0, test_log_path); + let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.get(p0).unwrap(); + c.sync(SEC5).unwrap(); + assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES); + barrier.wait(); + }); + s.spawn(|_| { + barrier.wait(); + // udp thread + let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); + udp.connect(sock_addrs[0]).unwrap(); + for _ in 0..15 { + udp.send(TEST_MSG_BYTES).unwrap(); + std::thread::sleep(MS100.unwrap()); + } + barrier.wait(); + }); + }) + .unwrap(); +} + +#[test] +fn udp_reowolf_swap() { + let test_log_path = Path::new("./logs/udp_reowolf_swap"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let barrier = std::sync::Barrier::new(2); + scope(|s| { + s.spawn(|_| { + barrier.wait(); + // reowolf thread + let mut c = file_logged_connector(0, test_log_path); + let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); + c.connect(SEC1).unwrap(); + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC5).unwrap(); + assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES); + barrier.wait(); + }); + s.spawn(|_| { + barrier.wait(); + // udp thread + let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); + udp.connect(sock_addrs[0]).unwrap(); + let mut buf = new_u8_buffer(256); + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(60)); + udp.send(TEST_MSG_BYTES).unwrap(); + } + let len = udp.recv(&mut buf).unwrap(); + assert_eq!(TEST_MSG_BYTES, &buf[0..len]); + barrier.wait(); + }); + }) + .unwrap(); +} + +#[test] +fn example_pres_3() { + let test_log_path = Path::new("./logs/example_pres_3"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // "amy" + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + // put {A} and FAIL + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + // put {B} and FAIL + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + // put {A, B} and SUCCEED + c.put(p0, TEST_MSG.clone()).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + }); + s.spawn(|_| { + // "bob" + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..2 { + // get {A, B} and FAIL + c.get(p0).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap_err(); + } + // get {A, B} and SUCCEED + c.get(p0).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn ac_not_b() { + let test_log_path = Path::new("./logs/ac_not_b"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + // "amy" + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC5).unwrap(); + + // put both A and B + c.put(p0, TEST_MSG.clone()).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap_err(); + }); + s.spawn(|_| { + // "bob" + let pdl = b" + primitive ac_not_b(in a, in b, out c){ + // forward A to C but keep B silent + sync { put(c, get(a)); } + }"; + let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap()); + let mut c = file_logged_configured_connector(1, test_log_path, pd); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + let [a, b] = c.new_port_pair(); + + c.add_component(b"", b"ac_not_b", &[p0, p1, a]).unwrap(); + + c.connect(SEC1).unwrap(); + + c.get(b).unwrap(); + c.sync(SEC1).unwrap_err(); + }); + }) + .unwrap(); +} + +#[test] +fn many_rounds_net() { + let test_log_path = Path::new("./logs/many_rounds_net"); + let sock_addrs = [next_test_addr()]; + const NUM_ROUNDS: usize = 1_000; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..NUM_ROUNDS { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..NUM_ROUNDS { + c.get(p0).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + }) + .unwrap(); +} +#[test] +fn many_rounds_mem() { + let test_log_path = Path::new("./logs/many_rounds_mem"); + const NUM_ROUNDS: usize = 1_000; + let mut c = file_logged_connector(0, test_log_path); + let [p0, p1] = c.new_port_pair(); + c.connect(SEC1).unwrap(); + for _ in 0..NUM_ROUNDS { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + } +} + +#[test] +fn pdl_reo_lossy() { + let pdl = b" + primitive lossy(in a, out b) { + while(true) sync { + msg m = null; + if(fires(a)) { + m = get(a); + if(fires(b)) { + put(b, m); + } + } + } + } + "; + reowolf::ProtocolDescription::parse(pdl).unwrap(); +} + +#[test] +fn pdl_reo_fifo1() { + let pdl = b" + primitive fifo1(in a, out b) { + msg m = null; + while(true) sync { + if(m == null) { + if(fires(a)) m=get(a); + } else { + if(fires(b)) put(b, m); + m = null; + } + } + } + "; + reowolf::ProtocolDescription::parse(pdl).unwrap(); +} + +#[test] +fn pdl_reo_fifo1full() { + let test_log_path = Path::new("./logs/pdl_reo_fifo1full"); + let pdl = b" + primitive fifo1full(in a, out b) { + bool is_set = true; + msg m = create(0); + while(true) sync { + if(!is_set) { + if(fires(a)) m=get(a); + is_set = false; + } else { + if(fires(b)) put(b, m); + is_set = true; + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + let [_p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"", b"fifo1full", &[g0, p1]).unwrap(); + c.connect(None).unwrap(); + c.get(g1).unwrap(); + c.sync(None).unwrap(); + assert_eq!(0, c.gotten(g1).unwrap().len()); +} + +#[test] +fn pdl_msg_consensus() { + let test_log_path = Path::new("./logs/pdl_msg_consensus"); + let pdl = b" + primitive msgconsensus(in a, in b) { + while(true) sync { + msg x = get(a); + msg y = get(b); + assert(x == y); + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"", b"msgconsensus", &[g0, g1]).unwrap(); + c.connect(None).unwrap(); + c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap(); + c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); + c.sync(SEC1).unwrap(); + + c.put(p0, Payload::from(b"HEY" as &[_])).unwrap(); + c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); + c.sync(SEC1).unwrap_err(); +} + +#[test] +fn sequencer3_prim() { + let test_log_path = Path::new("./logs/sequencer3_prim"); + let pdl = b" + primitive sequencer3(out a, out b, out c) { + u32 i = 0; + while(true) sync { + out to = a; + if (i==1) to = b; + else if(i==2) to = c; + if(fires(to)) { + put(to, create(0)); + i = (i + 1)%3; + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let which_of_three = move |c: &mut Connector| { + // setup three sync batches. sync. return which succeeded + c.get(g0).unwrap(); + c.next_batch().unwrap(); + c.get(g1).unwrap(); + c.next_batch().unwrap(); + c.get(g2).unwrap(); + c.sync(None).unwrap() + }; + + const TEST_ROUNDS: usize = 50; + // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] + for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { + // silent round + assert_eq!(0, c.sync(None).unwrap()); + // non silent round + assert_eq!(expected_batch_idx, which_of_three(&mut c)); + } +} + +#[test] +fn sequencer3_comp() { + let test_log_path = Path::new("./logs/sequencer3_comp"); + let pdl = b" + primitive replicator(in a, out b, out c) { + while (true) { + sync { + if (fires(a) && fires(b) && fires(c)) { + msg x = get(a); + put(b, x); + put(c, x); + } else { + assert(!fires(a) && !fires(b) && !fires(c)); + } + } + } + } + primitive fifo1_init(bool has_value, T m, in a, out b) { + while(true) sync { + if(has_value && fires(b)) { + put(b, m); + has_value = false; + } else if (!has_value && fires(a)) { + m = get(a); + has_value = true; + } + } + } + composite fifo1_full(in a, out b) { + new fifo1_init(true, create(0), a, b); + } + composite fifo1(in a, out b) { + new fifo1_init(false, create(0), a, b); + } + composite sequencer3(out a, out b, out c) { + channel d -> e; + channel f -> g; + channel h -> i; + channel j -> k; + channel l -> m; + channel n -> o; + + new fifo1_full(o, d); + new replicator(e, f, a); + new fifo1(g, h); + new replicator(i, j, b); + new fifo1(k, l); + new replicator(m, n, c); + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let which_of_three = move |c: &mut Connector| { + // setup three sync batches. sync. return which succeeded + c.get(g0).unwrap(); + c.next_batch().unwrap(); + c.get(g1).unwrap(); + c.next_batch().unwrap(); + c.get(g2).unwrap(); + c.sync(SEC1).unwrap() + }; + + const TEST_ROUNDS: usize = 50; + // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] + for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { + // silent round + assert_eq!(0, c.sync(SEC1).unwrap()); + // non silent round + assert_eq!(expected_batch_idx, which_of_three(&mut c)); + } +} + +enum XRouterItem { + Silent, + GetA, + GetB, +} +// Hardcoded pseudo-random sequence of round behaviors for the native component +const XROUTER_ITEMS: &[XRouterItem] = { + use XRouterItem::{GetA as A, GetB as B, Silent as S}; + &[ + B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B, + S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A, + A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S, + ] +}; + +#[test] +fn xrouter_prim() { + let test_log_path = Path::new("./logs/xrouter_prim"); + let pdl = b" + primitive xrouter(in a, out b, out c) { + while(true) sync { + if(fires(a)) { + if(fires(b)) put(b, get(a)); + else put(c, get(a)); + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let now = std::time::Instant::now(); + for item in XROUTER_ITEMS.iter() { + match item { + XRouterItem::Silent => {} + XRouterItem::GetA => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g1).unwrap(); + } + XRouterItem::GetB => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g2).unwrap(); + } + } + assert_eq!(0, c.sync(SEC1).unwrap()); + } + println!("PRIM {:?}", now.elapsed()); +} +#[test] +fn xrouter_comp() { + let test_log_path = Path::new("./logs/xrouter_comp"); + let pdl = b" + primitive replicator(in a, out b, out c) { + while (true) { + sync { + if (fires(a) && fires(b) && fires(c)) { + msg x = get(a); + put(b, x); + put(c, x); + } else { + assert(!fires(a) && !fires(b) && !fires(c)); + } + } + } + } + + primitive merger(in a, in b, out c) { + while (true) { + sync { + if (fires(a) && !fires(b) && fires(c)) { + put(c, get(a)); + } else if (!fires(a) && fires(b) && fires(c)) { + put(c, get(b)); + } else { + assert(!fires(a) && !fires(b) && !fires(c)); + } + } + } + } + + primitive lossy(in a, out b) { + while(true) sync { + if(fires(a)) { + auto m = get(a); + if(fires(b)) put(b, m); + } + } + } + primitive sync_drain(in a, in b) { + while(true) sync { + if(fires(a)) { + msg drop_it = get(a); + msg on_the_floor = get(b); + } + } + } + composite xrouter(in a, out b, out c) { + channel d -> e; + channel f -> g; + channel h -> i; + channel j -> k; + channel l -> m; + channel n -> o; + channel p -> q; + channel r -> s; + channel t -> u; + + new replicator(a, d, f); + new replicator(g, t, h); + new lossy(e, l); + new lossy(i, j); + new replicator(m, b, p); + new replicator(k, n, c); + new merger(q, o, r); + new sync_drain(u, s); + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let now = std::time::Instant::now(); + for item in XROUTER_ITEMS.iter() { + match item { + XRouterItem::Silent => {} + XRouterItem::GetA => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g1).unwrap(); + } + XRouterItem::GetB => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g2).unwrap(); + } + } + assert_eq!(0, c.sync(SEC1).unwrap()); + } + println!("COMP {:?}", now.elapsed()); +} + +#[test] +fn count_stream() { + let test_log_path = Path::new("./logs/count_stream"); + let pdl = b" + primitive count_stream(out o) { + msg m = create(1); + m[0] = 0; + while(true) sync { + put(o, m); + m[0] += 1; + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + c.add_component(b"", b"count_stream", &[p0]).unwrap(); + c.connect(None).unwrap(); + + for expecting in 0u8..16 { + c.get(g0).unwrap(); + c.sync(None).unwrap(); + assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); + } +} + +#[test] +fn for_msg_byte() { + let test_log_path = Path::new("./logs/for_msg_byte"); + let pdl = b" + primitive for_msg_byte(out o) { + u8 i = 0; + u32 idx = 0; + while(i<8) { + msg m = create(1); + m[idx] = i; + sync put(o, m); + i += 1; + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + c.add_component(b"", b"for_msg_byte", &[p0]).unwrap(); + c.connect(None).unwrap(); + + for expecting in 0u8..8 { + c.get(g0).unwrap(); + c.sync(None).unwrap(); + assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); + } + c.sync(None).unwrap(); +} + +#[test] +fn eq_causality() { + let test_log_path = Path::new("./logs/eq_causality"); + let pdl = b" + primitive eq(in a, in b, out c) { + msg ma = create(0); + msg mb = create(0); + while(true) sync { + if(fires(a)) { + // b and c also fire! + // left first! + ma = get(a); + put(c, ma); + mb = get(b); + assert(ma == mb); + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + /* + [native]p0-->g0[eq]p1--. + g1 | + ^---------` + */ + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); + + /* + V--------. + g2 | + [native]p2-->g3[eq]p3--` + */ + let [p2, g2] = c.new_port_pair(); + let [p3, g3] = c.new_port_pair(); + c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); + c.connect(None).unwrap(); + + for _ in 0..4 { + // everything is fine with LEFT FIRST + c.put(p0, TEST_MSG.clone()).unwrap(); + c.sync(MS100).unwrap(); + + // no solution when left is NOT FIRST + c.put(p2, TEST_MSG.clone()).unwrap(); + c.sync(MS100).unwrap_err(); + } +} + +#[test] +fn eq_no_causality() { + let test_log_path = Path::new("./logs/eq_no_causality"); + let pdl = b" + composite eq(in a, in b, out c) { + channel leftfirsto -> leftfirsti; + new eqinner(a, b, c, leftfirsto, leftfirsti); + } + primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) { + msg ma = create(0); + msg mb = create(0); + while(true) sync { + if(fires(a)) { + // b and c also fire! + if(fires(leftfirsti)) { + // left first! DO USE DUMMY + ma = get(a); + put(c, ma); + mb = get(b); + + // using dummy! + put(leftfirsto, ma); + auto drop_it = get(leftfirsti); + } else { + // right first! DON'T USE DUMMY + mb = get(b); + put(c, mb); + ma = get(a); + } + assert(ma == mb); + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + /* + [native]p0-->g0[eq]p1--. + g1 | + ^---------` + */ + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); + + /* + V--------. + g2 | + [native]p2-->g3[eq]p3--` + */ + let [p2, g2] = c.new_port_pair(); + let [p3, g3] = c.new_port_pair(); + c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); + c.connect(None).unwrap(); + + for _ in 0..32 { + // ok when they send + c.put(p0, TEST_MSG.clone()).unwrap(); + c.put(p2, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + // ok when they don't + c.sync(SEC1).unwrap(); + } +}