From d1a70dfdafbac99460f40f714c232c461c9a1d77 2020-06-24 08:09:12 From: Christopher Esterhuyse Date: 2020-06-24 08:09:12 Subject: [PATCH] more robust error handling --- diff --git a/.gitignore b/.gitignore index 115454b248fd607444f83c4b227671c3ff5988e7..65d034153c5fe772b0faa387df97a9a5d1daa93a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ target Cargo.lock main examples/*/*.exe -examples/reowolf* \ No newline at end of file +examples/reowolf* +logs \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index a9014653332517454160244edebd54fb3ebb64be..6a544ba2b12d81fd45a96dcadb43feef1823fc9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,9 @@ mod runtime; // #[cfg(test)] // mod test; -pub use common::Polarity; +pub use common::{ControllerId, Polarity, PortId}; pub use protocol::ProtocolDescription; -pub use runtime::{error, Connector, EndpointSetup, StringLogger}; +pub use runtime::{error, Connector, EndpointSetup, FileLogger, VecLogger}; // #[cfg(feature = "ffi")] // pub use runtime::ffi; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 87f4d4f5c6f37d018cfbcf3056c3dc17705d9a79..688dae6221ee07a62d7455a4e9c1ee904d357887 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -101,7 +101,7 @@ impl Connector { } } } - pub fn sync(&mut self, timeout: Duration) -> Result { + pub fn sync(&mut self, timeout: Option) -> Result { use SyncError::*; match &mut self.phased { ConnectorPhased::Setup { .. } => Err(NotConnected), @@ -113,7 +113,7 @@ impl Connector { round_result, .. } => { - let deadline = Instant::now() + timeout; + let mut deadline = timeout.map(|to| Instant::now() + to); let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers log!( @@ -347,8 +347,35 @@ impl Connector { // try recv messages arriving through endpoints log!(logger, "No decision yet. Let's recv an endpoint msg..."); { - let (endpoint_index, msg) = - endpoint_manager.try_recv_any(deadline).unwrap(); + let (endpoint_index, msg) = loop { + match endpoint_manager.try_recv_any_comms(deadline)? { + None => { + log!( + logger, + "Reached user-defined deadling without decision..." + ); + if let Some(parent) = neighborhood.parent { + log!( + logger, + "Sending failure request to parent index {}", + parent + ); + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Suggest { + suggestion: Decision::Failure, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } else { + log!(logger, "As the leader, deciding on timeout"); + break 'undecided Decision::Failure; + } + deadline = None; + } + Some((endpoint_index, msg)) => break (endpoint_index, msg), + } + }; log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); let comm_msg_contents = match msg { Msg::SetupMsg(..) => { diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 947bbadd2915b8b24b4ff1bafc98fc26844e7297..907d7d3813ed2ddba23ed09dba2ff0848904bb78 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -4,6 +4,12 @@ struct MonitoredReader { bytes: usize, r: R, } +#[derive(Debug)] +enum TryRecyAnyError { + Timeout, + PollFailed, + EndpointError { error: EndpointError, index: usize }, +} ///////////////////// @@ -36,16 +42,48 @@ impl Endpoint { }, } } - pub fn send(&mut self, msg: &T) -> Result<(), ()> { - bincode::serialize_into(&mut self.stream, msg).map_err(drop) + pub fn send(&mut self, msg: &T) -> Result<(), EndpointError> { + bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint) } } impl EndpointManager { - pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { + pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { + let endpoint = &mut self.endpoint_exts[index].endpoint; + endpoint.send(msg).map_err(|err| { + ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err) + }) + } + pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> { self.endpoint_exts[index].endpoint.send(msg) } - pub fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { + pub fn try_recv_any_comms( + &mut self, + deadline: Option, + ) -> Result, SyncError> { + use {SyncError as Se, TryRecyAnyError as Trae}; + match self.try_recv_any(deadline) { + Ok(tup) => Ok(Some(tup)), + Err(Trae::Timeout) => Ok(None), + Err(Trae::PollFailed) => Err(Se::PollFailed), + Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)), + } + } + pub fn try_recv_any_setup( + &mut self, + deadline: Option, + ) -> Result<(usize, Msg), ConnectError> { + use {ConnectError as Ce, TryRecyAnyError as Trae}; + self.try_recv_any(deadline).map_err(|err| match err { + Trae::Timeout => Ce::Timeout, + Trae::PollFailed => Ce::PollFailed, + Trae::EndpointError { error, index } => Ce::EndpointSetupError( + self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(), + error, + ), + }) + } + fn try_recv_any(&mut self, deadline: Option) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { @@ -66,8 +104,12 @@ impl EndpointManager { } } // 3. No message yet. Do we have enough time to poll? - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); self.polled_undrained.insert(index); diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 672b4ac8c23aca6fc14545447c72e10cd9f43360..d6621b4db3a53907076ed481ed8a74f085a88f1d 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -1,17 +1,10 @@ use crate::common::*; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum EndpointError { MalformedMessage, BrokenEndpoint, } -#[derive(Debug)] -pub enum TryRecyAnyError { - Timeout, - PollFailed, - EndpointError { error: EndpointError, index: usize }, - BrokenEndpoint(usize), -} #[derive(Debug, Clone)] pub enum SyncError { Timeout, @@ -19,6 +12,8 @@ pub enum SyncError { InconsistentProtoComponent(ProtoComponentId), IndistinguishableBatches([usize; 2]), DistributedTimeout, + PollFailed, + BrokenEndpoint(usize), } #[derive(Debug)] pub enum PortOpError { @@ -38,3 +33,14 @@ pub enum GottenError { pub enum NextBatchError { NotConnected, } +#[derive(Debug)] +pub enum ConnectError { + BindFailed(SocketAddr), + PollInitFailed, + Timeout, + PollFailed, + AcceptFailed(SocketAddr), + AlreadyConnected, + PortPeerPolarityMismatch(PortId), + EndpointSetupError(SocketAddr, EndpointError), +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index a6192caa1e694f193953d0b684cb38e368306f73..ae8e0ba48fab1c5c3dd2d80e17e4664a42a5b37b 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -75,13 +75,14 @@ pub struct ProtoComponent { ports: HashSet, } pub trait Logger: Debug { - fn line_writer(&mut self) -> &mut dyn std::fmt::Write; - fn dump_log(&self, w: &mut dyn std::io::Write); + fn line_writer(&mut self) -> &mut dyn std::io::Write; } #[derive(Debug)] -pub struct StringLogger(ControllerId, String); +pub struct VecLogger(ControllerId, Vec); #[derive(Debug)] pub struct DummyLogger; +#[derive(Debug)] +pub struct FileLogger(ControllerId, std::fs::File); #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, @@ -203,7 +204,19 @@ impl IdManager { .into() } } +impl Drop for Connector { + fn drop(&mut self) { + log!(&mut *self.logger, "Connector dropping. Goodbye!"); + } +} impl Connector { + pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { + std::mem::swap(&mut self.logger, &mut new_logger); + new_logger + } + pub fn get_logger(&mut self) -> &mut dyn Logger { + &mut *self.logger + } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the native let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; @@ -260,61 +273,55 @@ impl Connector { } } impl Logger for DummyLogger { - fn line_writer(&mut self) -> &mut dyn std::fmt::Write { - impl std::fmt::Write for DummyLogger { - fn write_str(&mut self, _: &str) -> Result<(), std::fmt::Error> { + fn line_writer(&mut self) -> &mut dyn std::io::Write { + impl std::io::Write for DummyLogger { + fn flush(&mut self) -> Result<(), std::io::Error> { Ok(()) } + fn write(&mut self, bytes: &[u8]) -> Result { + Ok(bytes.len()) + } } self } - fn dump_log(&self, _: &mut dyn std::io::Write) {} } -impl StringLogger { +impl VecLogger { pub fn new(controller_id: ControllerId) -> Self { - Self(controller_id, String::default()) + Self(controller_id, Default::default()) } } -impl Drop for StringLogger { +impl Drop for VecLogger { fn drop(&mut self) { let stdout = std::io::stderr(); let mut lock = stdout.lock(); writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); - self.dump_log(&mut lock); - // lock.flush().unwrap(); - // std::thread::sleep(Duration::from_millis(50)); + let _ = std::io::Write::write(&mut lock, self.1.as_slice()); } } -impl Logger for StringLogger { - fn line_writer(&mut self) -> &mut dyn std::fmt::Write { - use std::fmt::Write; +impl Logger for VecLogger { + fn line_writer(&mut self) -> &mut dyn std::io::Write { let _ = write!(&mut self.1, "\nCID({}): ", self.0); self } - fn dump_log(&self, w: &mut dyn std::io::Write) { - let _ = w.write(self.1.as_bytes()); +} +impl FileLogger { + pub fn new(controller_id: ControllerId, file: std::fs::File) -> Self { + Self(controller_id, file) } } -impl std::fmt::Write for StringLogger { - fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> { - self.1.write_str(s) +impl Logger for FileLogger { + fn line_writer(&mut self) -> &mut dyn std::io::Write { + let _ = write!(&mut self.1, "\nCID({}): ", self.0); + &mut self.1 } } -impl Connector { - pub fn get_logger(&self) -> &dyn Logger { - &*self.logger +impl std::io::Write for VecLogger { + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) } - pub fn print_state(&self) { - let stdout = std::io::stdout(); - let mut lock = stdout.lock(); - writeln!( - lock, - "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n", - self.id_manager.controller_id - ) - .unwrap(); - self.get_logger().dump_log(&mut lock); - writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); + fn write(&mut self, data: &[u8]) -> Result { + self.1.extend_from_slice(data); + Ok(data.len()) } } impl Predicate { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index f6233c1b4a40ea2bfdd0b6239a8fb31401f46eb1..ae3f1d44608bc347e7930846945eda154243cccf 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,14 +1,15 @@ use crate::common::*; use crate::runtime::*; +use std::io::ErrorKind::WouldBlock; impl Connector { pub fn new_simple( proto_description: Arc, controller_id: ControllerId, ) -> Self { - let logger = Box::new(StringLogger::new(controller_id)); + let logger = Box::new(DummyLogger); // let logger = Box::new(DummyLogger); - let surplus_sockets = 8; + let surplus_sockets = 2; Self::new(logger, proto_description, controller_id, surplus_sockets) } pub fn new( @@ -52,15 +53,16 @@ impl Connector { ConnectorPhased::Communication { .. } => Err(()), } } - pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { + pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { + use ConnectError::*; match &mut self.phased { ConnectorPhased::Communication { .. } => { log!(self.logger, "Call to connecting in connected state"); - Err(()) + Err(AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); - let deadline = Instant::now() + timeout; + let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( &mut *self.logger, @@ -81,6 +83,7 @@ impl Connector { deadline, )?; log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(self.logger, "connect() finished. setup phase complete"); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { round_index: 0, @@ -100,9 +103,10 @@ fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, EndpointSetup)], port_info: &mut PortInfo, - deadline: Instant, -) -> Result { + deadline: Option, +) -> Result { //////////////////////////////////////////// + use ConnectError::*; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -119,13 +123,15 @@ fn new_endpoint_manager( local_port: PortId, endpoint_setup: &EndpointSetup, poll: &mut Poll, - ) -> Result { + ) -> Result { let todo_endpoint = if endpoint_setup.is_active { - let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?; + let mut stream = TcpStream::connect(endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) } else { - let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?; + let mut listener = TcpListener::bind(endpoint_setup.sock_addr) + .map_err(|_| BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Listener(listener) }; @@ -134,7 +140,7 @@ fn new_endpoint_manager( //////////////////////////////////////////// // 1. Start to construct EndpointManager - let mut poll = Poll::new().map_err(drop)?; + let mut poll = Poll::new().map_err(|_| PollInitFailed)?; let mut events = Events::with_capacity(64); let mut polled_undrained = IndexSet::::default(); let mut delayed_messages = vec![]; @@ -146,7 +152,7 @@ fn new_endpoint_manager( .map(|(index, (local_port, endpoint_setup))| { init_todo(Token(index), *local_port, endpoint_setup, &mut poll) }) - .collect::, _>>()?; + .collect::, ConnectError>>()?; // 3. Using poll to drive progress: // - accept an incoming connection for each TcpListener (turning them into endpoints too) @@ -154,19 +160,33 @@ fn new_endpoint_manager( // - for each endpoint, recv the peer's PortId, and let mut setup_incomplete: HashSet = (0..todos.len()).collect(); while !setup_incomplete.is_empty() { - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; - poll.poll(&mut events, Some(remaining)).map_err(drop)?; + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + poll.poll(&mut events, remaining).map_err(|_| PollFailed)?; for event in events.iter() { let token = event.token(); let Token(index) = token; let todo: &mut Todo = &mut todos[index]; if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { - let (mut stream, peer_addr) = listener.accept().map_err(drop)?; - poll.registry().deregister(listener).unwrap(); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr); - let endpoint = Endpoint { stream, inbox: vec![] }; - todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + match listener.accept() { + Ok((mut stream, peer_addr)) => { + poll.registry().deregister(listener).unwrap(); + poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!( + logger, + "Endpoint[{}] accepted a connection from {:?}", + index, + peer_addr + ); + let endpoint = Endpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + } + Err(e) if e.kind() == WouldBlock => {} + Err(_) => return Err(AcceptFailed(listener.local_addr().unwrap())), + } } match todo { Todo { @@ -185,12 +205,19 @@ fn new_endpoint_manager( polarity: local_polarity, port: *local_port, })); - endpoint.send(&msg)?; + endpoint + .send(&msg) + .map_err(|e| { + EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + }) + .unwrap(); log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv().map_err(drop)?; + let maybe_msg = endpoint.try_recv().map_err(|e| { + EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { polled_undrained.insert(index); } @@ -198,7 +225,11 @@ fn new_endpoint_manager( None => {} // msg deserialization incomplete Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); - assert!(peer_info.polarity != local_polarity); + if peer_info.polarity == local_polarity { + return Err(ConnectError::PortPeerPolarityMismatch( + *local_port, + )); + } *recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! port_info.peers.insert(*local_port, peer_info.port); @@ -251,16 +282,16 @@ fn init_neighborhood( controller_id: ControllerId, logger: &mut dyn Logger, em: &mut EndpointManager, - deadline: Instant, -) -> Result { + deadline: Option, +) -> Result { use {Msg::SetupMsg as S, SetupMsg::*}; log!(logger, "beginning neighborhood construction"); // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); - for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index in 0..em.endpoint_exts.len() { log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); - ee.endpoint.send(&echo)?; + em.send_to_setup(index, &echo)?; awaiting.insert(index); } @@ -270,7 +301,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any_setup(deadline)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -290,7 +321,7 @@ fn init_neighborhood( if awaiting.is_empty() { if let Some(p) = parent { // return the echo to my parent - em.send_to(p, &S(LeaderEcho { maybe_leader }))?; + em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?; } else { // wave completed! break 'echo_loop; @@ -307,15 +338,15 @@ fn init_neighborhood( if em.endpoint_exts.len() == 1 { // immediately reply to parent log!(logger, "replying echo to parent {:?} immediately", index); - em.send_to(index, &echo)?; + em.send_to_setup(index, &echo)?; } else { - for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index2 in 0..em.endpoint_exts.len() { if index2 == index { // don't propagate echo to my parent continue; } log!(logger, "repeating echo {:?} to {:?}", &echo, index2); - ee.endpoint.send(&echo)?; + em.send_to_setup(index2, &echo)?; awaiting.insert(index2); } } @@ -346,7 +377,7 @@ fn init_neighborhood( // in this loop, every node sends 1 message to each neighbor // await 1 message from all non-parents. let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index in 0..em.endpoint_exts.len() { let msg = if Some(index) == parent { &S(YouAreMyParent) } else { @@ -354,13 +385,13 @@ fn init_neighborhood( &msg_for_non_parents }; log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); - ee.endpoint.send(msg)?; + em.send_to_setup(index, msg)?; } let mut children = Vec::default(); em.undelay_all(); while !awaiting.is_empty() { log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any_setup(deadline)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index)); diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 89eec1b24bdaf8969138bb3edabbbf96f9ac38d6..18f632450344f84481836738522f9316852bb4c6 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1,6 +1,9 @@ use crate as reowolf; use crossbeam_utils::thread::scope; -use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription}; +use reowolf::{ + Polarity::{Getter, Putter}, + *, +}; use std::net::SocketAddr; use std::{sync::Arc, time::Duration}; @@ -49,7 +52,7 @@ fn new_net_port() { #[test] fn trivial_connect() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); } #[test] @@ -58,7 +61,7 @@ fn single_node_connect() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); } #[test] @@ -68,12 +71,12 @@ fn multithreaded_connect() { s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); }); }) .unwrap(); @@ -83,7 +86,7 @@ fn multithreaded_connect() { fn put_no_sync() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, _] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap(); } @@ -91,7 +94,7 @@ fn put_no_sync() { fn wrong_polarity_bad() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(i, (b"hi" as &[_]).into()).unwrap_err(); } @@ -99,7 +102,7 @@ fn wrong_polarity_bad() { fn dup_put_bad() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, _] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap_err(); } @@ -107,8 +110,8 @@ fn dup_put_bad() { #[test] fn trivial_sync() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - c.connect(Duration::from_secs(1)).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); } #[test] @@ -122,7 +125,7 @@ fn unconnected_gotten_err() { fn connected_gotten_err_no_round() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); } @@ -130,8 +133,8 @@ fn connected_gotten_err_no_round() { fn connected_gotten_err_ungotten() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); } @@ -139,7 +142,7 @@ fn connected_gotten_err_ungotten() { fn native_polarity_checks() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); // fail... c.get(o).unwrap_err(); c.put(i, (b"hi" as &[_]).into()).unwrap_err(); @@ -152,7 +155,7 @@ fn native_polarity_checks() { fn native_multiple_gets() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); c.get(i).unwrap_err(); } @@ -161,7 +164,7 @@ fn native_multiple_gets() { fn next_batch() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); c.next_batch().unwrap_err(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); @@ -171,10 +174,10 @@ fn next_batch() { fn native_self_msg() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); } #[test] @@ -184,17 +187,17 @@ fn two_natives_msg() { s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(g).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); }); }) .unwrap(); @@ -204,12 +207,12 @@ fn two_natives_msg() { fn trivial_nondet() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, i] = c.new_port_pair(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.get(i).unwrap(); // getting 0 batch c.next_batch().unwrap(); // silent 1 batch - assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap()); + assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); c.gotten(i).unwrap_err(); } @@ -220,18 +223,18 @@ fn connector_pair_nondet() { s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.next_batch().unwrap(); c.get(g).unwrap(); - assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap()); + assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); }); }) .unwrap(); @@ -245,7 +248,7 @@ fn cannot_use_moved_ports() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); let [p, g] = c.new_port_pair(); c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap_err(); c.get(g).unwrap_err(); } @@ -260,29 +263,35 @@ fn sync_sync() { let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); c.add_component(b"sync", &[g0, p1]).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p0, (b"hello" as &[_]).into()).unwrap(); c.get(g1).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); c.gotten(g1).unwrap(); } +fn file_logged_connector(controller_id: ControllerId, path: &str) -> Connector { + let file = std::fs::File::create(path).unwrap(); + let file_logger = Box::new(FileLogger::new(controller_id, file)); + Connector::new(file_logger, MINIMAL_PROTO.clone(), controller_id, 8) +} + #[test] fn double_net_connect() { let sock_addrs = [next_test_addr(), next_test_addr()]; scope(|s| { s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 2); + let mut c = file_logged_connector(0, "./logs/double_net_a.txt"); let [_p, _g] = [ c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) .unwrap(), c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) .unwrap(), ]; - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 3); + let mut c = file_logged_connector(1, "./logs/double_net_b.txt"); let [_g, _p] = [ c.new_net_port( Getter, @@ -295,7 +304,7 @@ fn double_net_connect() { ) .unwrap(), ]; - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); }); }) .unwrap(); @@ -314,7 +323,7 @@ fn distributed_msg_bounce() { native | sync p|--> | g|<-- */ - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 4); + let mut c = file_logged_connector(0, "./logs/distributed_msg_bounce_a.txt"); let [p, g] = [ c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) .unwrap(), @@ -322,15 +331,15 @@ fn distributed_msg_bounce() { .unwrap(), ]; c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); }); s.spawn(|_| { /* native p|--> g|<-- */ - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 5); + let mut c = file_logged_connector(1, "./logs/distributed_msg_bounce_b.txt"); let [g, p] = [ c.new_net_port( Getter, @@ -343,10 +352,10 @@ fn distributed_msg_bounce() { ) .unwrap(), ]; - c.connect(Duration::from_secs(1)).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap(); c.get(g).unwrap(); - c.sync(Duration::from_secs(1)).unwrap(); + c.sync(Some(Duration::from_secs(1))).unwrap(); c.gotten(g).unwrap(); }); })