diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index b7be6c2a3c8f465642694be7141ba3e910a8b6b0..6bf4dc3ca693e7b4a5fb5fb430dd4fc63776f13d 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -23,12 +23,12 @@ impl Controller { main_component: &[u8], protocol_description: Arc, bound_proto_interface: &[(PortBinding, Polarity)], + logger: &mut String, deadline: Instant, ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { use ConnectErr::*; - let mut logger = String::default(); - log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); + log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); @@ -94,11 +94,11 @@ impl Controller { } } } - log!(&mut logger, "{:03?} setup todos...", major); + log!(logger, "{:03?} setup todos...", major); // 2. convert the arena to Arena and return the let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?; + Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }; let p_monos = vec![MonoP { @@ -109,14 +109,14 @@ impl Controller { // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} let family = Self::setup_sink_tree_family( major, - &mut logger, + logger, &mut endpoint_exts, &mut messenger_state, ekeys_network, deadline, )?; - log!(&mut logger, "CONNECT PHASE END! ~"); + log!(logger, "CONNECT PHASE END! ~"); let inner = ControllerInner { family, messenger_state, @@ -125,7 +125,11 @@ impl Controller { mono_ps: p_monos, mono_n: n_mono, round_index: 0, - logger, + logger: { + let mut l = String::default(); + std::mem::swap(&mut l, logger); + l + }, }; let controller = Self { protocol_description, @@ -137,9 +141,21 @@ impl Controller { Ok((controller, native_interface)) } - fn test_stream_connectivity(stream: &mut TcpStream) -> bool { - use std::io::Write; - stream.write(&[]).is_ok() + // with mio v0.6 attempting to read bytes into a nonempty buffer appears to + // be the only reliably platform-independent means of testing the connectivity of + // a mio::TcpStream (see Self::connection_testing_read). + // as this unavoidably MAY read some crucial payload bytes, we have to be careful + // to pass these potentially populated buffers into the Endpoint, or bytes may be lost. + // This is done with Endpoint::from_fresh_stream_and_inbox. + fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec) -> std::io::Result<()> { + inbox.clear(); + use std::io::Read; + match stream.read_to_end(inbox) { + Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), + Ok(_) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), + Err(e) => Err(e), + } } // inserts @@ -189,9 +205,15 @@ impl Controller { // 4. until all in endpoint_ext_todos are Finished variant, handle events let mut polled_undrained_later = IndexSet::<_>::default(); let mut backoff_millis = 10; + // see Self::connection_testing_read for why we populate Endpoint inboxes here. + let mut next_inbox = vec![]; while !to_finish.is_empty() { - ms.poll_events(deadline)?; + ms.poll_events(deadline).map_err(|e| { + log!(logger, "{:03?} timing out", major); + e + })?; for event in ms.events.iter() { + log!(logger, "event {:#?}", event); let token = event.token(); let ekey = Key::from_token(token); let entry = endpoint_ext_todos.get_mut(ekey).unwrap(); @@ -216,14 +238,16 @@ impl Controller { PassiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start PassiveConnecting...", major); assert!(event.readiness().is_writable()); - if !Self::test_stream_connectivity(stream) { + if Self::connection_testing_read(stream, &mut next_inbox).is_err() { return Err(PassiveConnectFailed(*addr)); } ms.poll.reregister(stream, token, ready_r, edge).expect("52"); let mut res = Ok(()); take_mut::take(entry, |e| { assert_let![PassiveConnecting { info, stream, .. } = e => { - let mut endpoint = Endpoint::from_fresh_stream(stream); + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); res = endpoint.send(msg); Finished(EndpointExt { info, endpoint }) @@ -236,17 +260,18 @@ impl Controller { ActiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start ActiveConnecting...", major); assert!(event.readiness().is_writable()); - if Self::test_stream_connectivity(stream) { + if Self::connection_testing_read(stream, &mut next_inbox).is_ok() { // connect successful - log!(logger, "CONNECT SUCCESS"); + log!(logger, "Connectivity test passed"); ms.poll.reregister(stream, token, ready_r, edge).expect("52"); take_mut::take(entry, |e| { assert_let![ActiveConnecting { stream, polarity, addr } = e => { - let endpoint = Endpoint::from_fresh_stream(stream); + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); ActiveRecving { endpoint, polarity, addr } }] }); - log!(logger, ".. ok"); } else { // connect failure. retry! log!(logger, "CONNECT FAIL");