From 557148ec2325bba6d8f8ac9b612353d888e79573 2020-04-30 10:03:19 From: Christopher Esterhuyse Date: 2020-04-30 10:03:19 Subject: [PATCH] removed accidentally pushed dead code --- diff --git a/src/runtime/batches.rs b/src/runtime/batches.rs deleted file mode 100644 index 4dabc257a3f3c51a6e407d24a37b8454d5acf749..0000000000000000000000000000000000000000 --- a/src/runtime/batches.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::ptr::NonNull; - -#[repr(C)] -pub struct PortOp { - port: u32, - buf_len: u32, - buf: Option>, -} - - -#[no_mangle] -pub extern "C" fn do_sync( - _ops_ptr: Option>, - _ops_len: usize, - _batch_ptr: Option>, - _batch_len: usize) -{ - // TODO -} \ No newline at end of file diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index 9a3242d99e4ff1b599f822902ad7c4591606488a..0c799da9a7f67012b1272dc165ea6ba7c8392bc9 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -36,6 +36,7 @@ impl Connector { bindings: Default::default(), polarities, main_component: main_component.to_vec(), + logger: "Logger created!\n".into(), }; *self = Connector::Configured(configured); Ok(()) @@ -88,6 +89,7 @@ impl Connector { &configured.main_component, configured.protocol_description.clone(), &bound_proto_interface[..], + &mut configured.logger, deadline, )?; *self = Connector::Connected(Connected { @@ -99,6 +101,7 @@ impl Connector { } pub fn get_mut_logger(&mut self) -> Option<&mut String> { match self { + Connector::Configured(configured) => Some(&mut configured.logger), Connector::Connected(connected) => Some(&mut connected.controller.inner.logger), _ => None, } diff --git a/src/runtime/endpoint.rs b/src/runtime/endpoint.rs index 5bf6f48d9ee1cdcf027d9332e1c9cc8df4cba735..359692552320f0bfbcbd269c5a5827581545e405 100644 --- a/src/runtime/endpoint.rs +++ b/src/runtime/endpoint.rs @@ -88,8 +88,11 @@ impl From for ConnectErr { } impl Endpoint { // asymmetric - pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self { - Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] }) + // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self { + // Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] }) + // } + pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec) -> Self { + Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] }) } // symmetric diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b48ed160dab9ad4fd9969b06da91b93878f030e5..d4811b397bbdcf22bac1c5033a4cd2caad11a94b 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -55,6 +55,7 @@ pub struct Configured { bindings: HashMap, protocol_description: Arc, main_component: Vec, + logger: String, } #[derive(Debug)] pub struct Connected { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index b7be6c2a3c8f465642694be7141ba3e910a8b6b0..6bf4dc3ca693e7b4a5fb5fb430dd4fc63776f13d 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -23,12 +23,12 @@ impl Controller { main_component: &[u8], protocol_description: Arc, bound_proto_interface: &[(PortBinding, Polarity)], + logger: &mut String, deadline: Instant, ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { use ConnectErr::*; - let mut logger = String::default(); - log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); + log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); @@ -94,11 +94,11 @@ impl Controller { } } } - log!(&mut logger, "{:03?} setup todos...", major); + log!(logger, "{:03?} setup todos...", major); // 2. convert the arena to Arena and return the let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?; + Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }; let p_monos = vec![MonoP { @@ -109,14 +109,14 @@ impl Controller { // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} let family = Self::setup_sink_tree_family( major, - &mut logger, + logger, &mut endpoint_exts, &mut messenger_state, ekeys_network, deadline, )?; - log!(&mut logger, "CONNECT PHASE END! ~"); + log!(logger, "CONNECT PHASE END! ~"); let inner = ControllerInner { family, messenger_state, @@ -125,7 +125,11 @@ impl Controller { mono_ps: p_monos, mono_n: n_mono, round_index: 0, - logger, + logger: { + let mut l = String::default(); + std::mem::swap(&mut l, logger); + l + }, }; let controller = Self { protocol_description, @@ -137,9 +141,21 @@ impl Controller { Ok((controller, native_interface)) } - fn test_stream_connectivity(stream: &mut TcpStream) -> bool { - use std::io::Write; - stream.write(&[]).is_ok() + // with mio v0.6 attempting to read bytes into a nonempty buffer appears to + // be the only reliably platform-independent means of testing the connectivity of + // a mio::TcpStream (see Self::connection_testing_read). + // as this unavoidably MAY read some crucial payload bytes, we have to be careful + // to pass these potentially populated buffers into the Endpoint, or bytes may be lost. + // This is done with Endpoint::from_fresh_stream_and_inbox. + fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec) -> std::io::Result<()> { + inbox.clear(); + use std::io::Read; + match stream.read_to_end(inbox) { + Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), + Ok(_) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), + Err(e) => Err(e), + } } // inserts @@ -189,9 +205,15 @@ impl Controller { // 4. until all in endpoint_ext_todos are Finished variant, handle events let mut polled_undrained_later = IndexSet::<_>::default(); let mut backoff_millis = 10; + // see Self::connection_testing_read for why we populate Endpoint inboxes here. + let mut next_inbox = vec![]; while !to_finish.is_empty() { - ms.poll_events(deadline)?; + ms.poll_events(deadline).map_err(|e| { + log!(logger, "{:03?} timing out", major); + e + })?; for event in ms.events.iter() { + log!(logger, "event {:#?}", event); let token = event.token(); let ekey = Key::from_token(token); let entry = endpoint_ext_todos.get_mut(ekey).unwrap(); @@ -216,14 +238,16 @@ impl Controller { PassiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start PassiveConnecting...", major); assert!(event.readiness().is_writable()); - if !Self::test_stream_connectivity(stream) { + if Self::connection_testing_read(stream, &mut next_inbox).is_err() { return Err(PassiveConnectFailed(*addr)); } ms.poll.reregister(stream, token, ready_r, edge).expect("52"); let mut res = Ok(()); take_mut::take(entry, |e| { assert_let![PassiveConnecting { info, stream, .. } = e => { - let mut endpoint = Endpoint::from_fresh_stream(stream); + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); res = endpoint.send(msg); Finished(EndpointExt { info, endpoint }) @@ -236,17 +260,18 @@ impl Controller { ActiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start ActiveConnecting...", major); assert!(event.readiness().is_writable()); - if Self::test_stream_connectivity(stream) { + if Self::connection_testing_read(stream, &mut next_inbox).is_ok() { // connect successful - log!(logger, "CONNECT SUCCESS"); + log!(logger, "Connectivity test passed"); ms.poll.reregister(stream, token, ready_r, edge).expect("52"); take_mut::take(entry, |e| { assert_let![ActiveConnecting { stream, polarity, addr } = e => { - let endpoint = Endpoint::from_fresh_stream(stream); + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); ActiveRecving { endpoint, polarity, addr } }] }); - log!(logger, ".. ok"); } else { // connect failure. retry! log!(logger, "CONNECT FAIL"); diff --git a/src/test/mod.rs b/src/test/mod.rs index 0665b0237f88ddd3711fff647a4c6357d53f390d..d9363f663461faabba05a676242a91e611fd5f9c 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -5,6 +5,7 @@ use core::fmt::Debug; use std::net::SocketAddr; mod connector; +mod net; mod setup; // using a static AtomicU16, shared between all tests in the binary, diff --git a/src/test/net.rs b/src/test/net.rs new file mode 100644 index 0000000000000000000000000000000000000000..e4db1629f0f0931ce18ed684d655800e9e990a24 --- /dev/null +++ b/src/test/net.rs @@ -0,0 +1,42 @@ +use mio::*; +use std::io::ErrorKind::WouldBlock; +use std::net::SocketAddr; + +fn connection_testing_read( + stream: &mut mio::net::TcpStream, + inbox: &mut Vec, +) -> std::io::Result<()> { + assert!(inbox.is_empty()); + use std::io::Read; + match stream.read_to_end(inbox) { + Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), + Ok(_) => Ok(()), + Err(e) if e.kind() == WouldBlock => Ok(()), + Err(e) => Err(e), + } +} + +#[test] +fn mio_tcp_connect_err() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(64); + + let addr: SocketAddr = "127.0.0.1:12000".parse().unwrap(); + let mut stream = mio::net::TcpStream::connect(&addr).unwrap(); + poll.register(&stream, Token(0), Ready::all(), PollOpt::edge()).unwrap(); + + let mut v = vec![]; + loop { + poll.poll(&mut events, Some(std::time::Duration::from_secs(2))).unwrap(); + for event in events.iter() { + assert_eq!(event.token(), Token(0)); + println!("readiness {:?}", event.readiness()); + // assert_eq!(event.readiness(), Ready::writable()); + + v.clear(); + println!("{:?}", connection_testing_read(&mut stream, &mut v)); + println!("----------- {:?}", &v); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } +}