diff --git a/src/common.rs b/src/common.rs index 45ca68a770a7b7f52b58a95913cd7dc2ca76023c..a712288779b10a1995443ee22c188860f279c50f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -68,6 +68,13 @@ pub enum Polarity { Putter, // output port (from the perspective of the component) Getter, // input port (from the perspective of the component) } +#[derive( + Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub enum EndpointPolarity { + Active, // calls connect() + Passive, // calls bind() listen() accept() +} #[derive(Eq, PartialEq, Copy, Clone, Debug)] pub enum AddComponentError { diff --git a/src/lib.rs b/src/lib.rs index 59acba88f8ee82541258ec55af625e45c6f22931..95563b12e61e50e7648faf0da4662131252fb41a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,9 @@ mod runtime; // #[cfg(test)] // mod test; -pub use common::{ConnectorId, Polarity, PortId}; +pub use common::{ConnectorId, EndpointPolarity, Polarity, PortId}; pub use protocol::ProtocolDescription; -pub use runtime::{error, Connector, EndpointSetup, FileLogger, VecLogger}; +pub use runtime::{error, Connector, FileLogger, VecLogger}; // #[cfg(feature = "ffi")] // pub use runtime::ffi; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index aba2a21d157df5e53f4bf63d3782347ddac4d5db..901835c3ad34d5be048affc1bca121407f36a163 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -92,7 +92,7 @@ pub struct FileLogger(ConnectorId, std::fs::File); #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, - pub is_active: bool, + pub endpoint_polarity: EndpointPolarity, } #[derive(Debug)] pub struct EndpointExt { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index e5a6c01f91e6d422233f5ff0e4ba21eb171dce01..d52109fd9a713dbce41509d0bdc965f08804bce5 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -32,10 +32,12 @@ impl Connector { pub fn new_net_port( &mut self, polarity: Polarity, - endpoint_setup: EndpointSetup, + sock_addr: SocketAddr, + endpoint_polarity: EndpointPolarity, ) -> Result { match &mut self.phased { ConnectorPhased::Setup { endpoint_setups, .. } => { + let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity }; let p = self.id_manager.new_port_id(); self.native_ports.insert(p); // {polarity, route} known. {peer} unknown. @@ -125,7 +127,7 @@ fn new_endpoint_manager( endpoint_setup: &EndpointSetup, poll: &mut Poll, ) -> Result { - let todo_endpoint = if endpoint_setup.is_active { + let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { let mut stream = TcpStream::connect(endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index acc97855e5ec84124298a318401dc59104188d30..25e12dd835bc4ffb4b474fffc06a0dc880fae9cb 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -2,12 +2,12 @@ use crate as reowolf; use crossbeam_utils::thread::scope; use reowolf::{ error::*, + EndpointPolarity::{Active, Passive}, Polarity::{Getter, Putter}, *, }; -use std::net::SocketAddr; -use std::{sync::Arc, time::Duration}; - +use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; +////////////////////////////////////////// fn next_test_addr() -> SocketAddr { use std::{ net::{Ipv4Addr, SocketAddrV4}, @@ -22,61 +22,82 @@ lazy_static::lazy_static! { static ref MINIMAL_PROTO: Arc = { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) }; } +fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { + let _ = std::fs::create_dir(dir_path); // we will check failure soon + let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); + let file = File::create(path).unwrap(); + let file_logger = Box::new(FileLogger::new(connector_id, file)); + Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8) +} + +////////////////////////////////////////// #[test] -fn simple_connector() { +fn basic_connector() { Connector::new_simple(MINIMAL_PROTO.clone(), 0); } +#[test] +fn basic_logged_connector() { + let test_log_path = Path::new("./logs/basic_logged_connector"); + file_logged_connector(0, test_log_path); +} + #[test] fn new_port_pair() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/new_port_pair"); + let mut c = file_logged_connector(0, test_log_path); let [_, _] = c.new_port_pair(); let [_, _] = c.new_port_pair(); } #[test] fn new_sync() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/new_sync"); + let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); c.add_component(b"sync", &[i, o]).unwrap(); } #[test] fn new_net_port() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/new_net_port"); + let mut c = file_logged_connector(0, test_log_path); let sock_addr = next_test_addr(); - let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); } #[test] fn trivial_connect() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/trivial_connect"); + let mut c = file_logged_connector(0, test_log_path); c.connect(Some(Duration::from_secs(1))).unwrap(); } #[test] fn single_node_connect() { let sock_addr = next_test_addr(); - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let test_log_path = Path::new("./logs/single_node_connect"); + let mut c = file_logged_connector(0, test_log_path); + let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); + let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); } #[test] fn multithreaded_connect() { let sock_addr = next_test_addr(); + let test_log_path = Path::new("./logs/multithreaded_connect"); scope(|s| { s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let mut c = file_logged_connector(0, test_log_path); + let _ = c.new_net_port(Getter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let mut c = file_logged_connector(1, test_log_path); + let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); }); }) @@ -85,7 +106,8 @@ fn multithreaded_connect() { #[test] fn put_no_sync() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/put_no_sync"); + let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap(); @@ -93,7 +115,8 @@ fn put_no_sync() { #[test] fn wrong_polarity_bad() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/wrong_polarity_bad"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(i, (b"hi" as &[_]).into()).unwrap_err(); @@ -101,7 +124,8 @@ fn wrong_polarity_bad() { #[test] fn dup_put_bad() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/dup_put_bad"); + let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap(); @@ -110,21 +134,24 @@ fn dup_put_bad() { #[test] fn trivial_sync() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/trivial_sync"); + let mut c = file_logged_connector(0, test_log_path); c.connect(Some(Duration::from_secs(1))).unwrap(); c.sync(Some(Duration::from_secs(1))).unwrap(); } #[test] fn unconnected_gotten_err() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/unconnected_gotten_err"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); } #[test] fn connected_gotten_err_no_round() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); @@ -132,7 +159,8 @@ fn connected_gotten_err_no_round() { #[test] fn connected_gotten_err_ungotten() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.sync(Some(Duration::from_secs(1))).unwrap(); @@ -141,7 +169,8 @@ fn connected_gotten_err_ungotten() { #[test] fn native_polarity_checks() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/native_polarity_checks"); + let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); // fail... @@ -154,7 +183,8 @@ fn native_polarity_checks() { #[test] fn native_multiple_gets() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/native_multiple_gets"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); @@ -163,7 +193,8 @@ fn native_multiple_gets() { #[test] fn next_batch() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/next_batch"); + let mut c = file_logged_connector(0, test_log_path); c.next_batch().unwrap_err(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.next_batch().unwrap(); @@ -173,7 +204,8 @@ fn next_batch() { #[test] fn native_self_msg() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/native_self_msg"); + let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); @@ -183,19 +215,20 @@ fn native_self_msg() { #[test] fn two_natives_msg() { + let test_log_path = Path::new("./logs/two_natives_msg"); let sock_addr = next_test_addr(); scope(|s| { s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let mut c = file_logged_connector(0, test_log_path); + let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(g).unwrap(); c.sync(Some(Duration::from_secs(1))).unwrap(); c.gotten(g).unwrap(); }); s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let mut c = file_logged_connector(1, test_log_path); + let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); c.sync(Some(Duration::from_secs(1))).unwrap(); @@ -206,7 +239,8 @@ fn two_natives_msg() { #[test] fn trivial_nondet() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/trivial_nondet"); + let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); @@ -219,11 +253,12 @@ fn trivial_nondet() { #[test] fn connector_pair_nondet() { + let test_log_path = Path::new("./logs/connector_pair_nondet"); let sock_addr = next_test_addr(); scope(|s| { s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let mut c = file_logged_connector(0, test_log_path); + let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.next_batch().unwrap(); c.get(g).unwrap(); @@ -231,8 +266,8 @@ fn connector_pair_nondet() { c.gotten(g).unwrap(); }); s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let mut c = file_logged_connector(1, test_log_path); + let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); c.sync(Some(Duration::from_secs(1))).unwrap(); @@ -243,10 +278,10 @@ fn connector_pair_nondet() { #[test] fn cannot_use_moved_ports() { - /* - native p|-->|g sync - */ - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let test_log_path = Path::new("./logs/cannot_use_moved_ports"); /* + native p|-->|g sync + */ + let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); c.add_component(b"sync", &[g, p]).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); @@ -256,11 +291,11 @@ fn cannot_use_moved_ports() { #[test] fn sync_sync() { - /* - native p0|-->|g0 sync - g1|<--|p1 - */ - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let test_log_path = Path::new("./logs/sync_sync"); /* + native p0|-->|g0 sync + g1|<--|p1 + */ + let mut c = file_logged_connector(0, test_log_path); let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); c.add_component(b"sync", &[g0, p1]).unwrap(); @@ -271,39 +306,24 @@ fn sync_sync() { c.gotten(g1).unwrap(); } -fn file_logged_connector(connector_id: ConnectorId, path: &str) -> Connector { - let file = std::fs::File::create(path).unwrap(); - let file_logger = Box::new(FileLogger::new(connector_id, file)); - Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8) -} - #[test] fn double_net_connect() { + let test_log_path = Path::new("./logs/double_net_connect"); let sock_addrs = [next_test_addr(), next_test_addr()]; scope(|s| { s.spawn(|_| { - let mut c = file_logged_connector(0, "./logs/double_net_a.txt"); + let mut c = file_logged_connector(0, test_log_path); let [_p, _g] = [ - c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) - .unwrap(), - c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) - .unwrap(), + c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { - let mut c = file_logged_connector(1, "./logs/double_net_b.txt"); + let mut c = file_logged_connector(1, test_log_path); let [_g, _p] = [ - c.new_net_port( - Getter, - EndpointSetup { sock_addr: sock_addrs[0], is_active: false }, - ) - .unwrap(), - c.new_net_port( - Putter, - EndpointSetup { sock_addr: sock_addrs[1], is_active: false }, - ) - .unwrap(), + c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; c.connect(Some(Duration::from_secs(1))).unwrap(); }); @@ -313,6 +333,7 @@ fn double_net_connect() { #[test] fn distributed_msg_bounce() { + let test_log_path = Path::new("./logs/distributed_msg_bounce"); /* native[0] | sync 0.p|-->|1.p native[1] 0.g|<--|1.g @@ -324,12 +345,10 @@ fn distributed_msg_bounce() { native | sync p|--> | g|<-- */ - let mut c = file_logged_connector(0, "./logs/distributed_msg_bounce_a.txt"); + let mut c = file_logged_connector(0, test_log_path); let [p, g] = [ - c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) - .unwrap(), - c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) - .unwrap(), + c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; c.add_component(b"sync", &[g, p]).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); @@ -340,18 +359,10 @@ fn distributed_msg_bounce() { native p|--> g|<-- */ - let mut c = file_logged_connector(1, "./logs/distributed_msg_bounce_b.txt"); + let mut c = file_logged_connector(1, test_log_path); let [g, p] = [ - c.new_net_port( - Getter, - EndpointSetup { sock_addr: sock_addrs[0], is_active: false }, - ) - .unwrap(), - c.new_net_port( - Putter, - EndpointSetup { sock_addr: sock_addrs[1], is_active: false }, - ) - .unwrap(), + c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); @@ -365,7 +376,8 @@ fn distributed_msg_bounce() { #[test] fn local_timeout() { - let mut c = file_logged_connector(0, "./logs/local_timeout.txt"); + let test_log_path = Path::new("./logs/local_timeout"); + let mut c = file_logged_connector(0, test_log_path); let [_, g] = c.new_port_pair(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(g).unwrap(); @@ -377,19 +389,20 @@ fn local_timeout() { #[test] fn parent_timeout() { + let test_log_path = Path::new("./logs/parent_timeout"); let sock_addr = next_test_addr(); scope(|s| { s.spawn(|_| { // parent; times out - let mut c = file_logged_connector(999, "./logs/parent_timeout_a.txt"); - let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let mut c = file_logged_connector(999, test_log_path); + let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout }); s.spawn(|_| { // child - let mut c = file_logged_connector(000, "./logs/parent_timeout_b.txt"); - let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let mut c = file_logged_connector(000, test_log_path); + let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout @@ -400,19 +413,20 @@ fn parent_timeout() { #[test] fn child_timeout() { + let test_log_path = Path::new("./logs/child_timeout"); let sock_addr = next_test_addr(); scope(|s| { s.spawn(|_| { // child; times out - let mut c = file_logged_connector(000, "./logs/child_timeout_a.txt"); - let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let mut c = file_logged_connector(000, test_log_path); + let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout }); s.spawn(|_| { // parent - let mut c = file_logged_connector(999, "./logs/child_timeout_b.txt"); - let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let mut c = file_logged_connector(999, test_log_path); + let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout @@ -423,44 +437,38 @@ fn child_timeout() { #[test] fn chain_connect() { + let test_log_path = Path::new("./logs/chain_connect"); let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()]; scope(|s| { s.spawn(|_| { - let mut c = file_logged_connector(0, "./logs/chain_connect_a.txt"); - c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: false }) - .unwrap(); + let mut c = file_logged_connector(0, test_log_path); + c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(10, test_log_path); + c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + }); + s.spawn(|_| { + // LEADER + let mut c = file_logged_connector(7, test_log_path); + c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { - let mut c = file_logged_connector(2, "./logs/chain_connect_b.txt"); - c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) - .unwrap(); - c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[1], is_active: false }) - .unwrap(); + let mut c = file_logged_connector(4, test_log_path); + c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); + c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { - let mut c = file_logged_connector(1, "./logs/chain_connect_c.txt"); - c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) - .unwrap(); - // c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[2], is_active: false }) - // .unwrap(); + let mut c = file_logged_connector(1, test_log_path); + c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); c.connect(Some(Duration::from_secs(1))).unwrap(); }); - // s.spawn(|_| { - // let mut c = file_logged_connector(3, "./logs/chain_connect_d.txt"); - // c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[2], is_active: true }) - // .unwrap(); - // c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[3], is_active: false }) - // .unwrap(); - // c.connect(Some(Duration::from_secs(1))).unwrap(); - // }); - // s.spawn(|_| { - // let mut c = file_logged_connector(4, "./logs/chain_connect_e.txt"); - // c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[3], is_active: true }) - // .unwrap(); - // c.connect(Some(Duration::from_secs(1))).unwrap(); - // }); }) .unwrap(); }