Changeset - 557148ec2325
[Not reviewed]
1 5 1
Christopher Esterhuyse - 5 years ago 2020-04-30 10:03:19
christopher.esterhuyse@gmail.com
removed accidentally pushed dead code
7 files changed with 94 insertions and 38 deletions:
0 comments (0 inline, 0 general)
src/runtime/batches.rs
Show inline comments
 
deleted file
src/runtime/connector.rs
Show inline comments
 
@@ -36,6 +36,7 @@ impl Connector {
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
            logger: "Logger created!\n".into(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
@@ -88,6 +89,7 @@ impl Connector {
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            &mut configured.logger,
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
@@ -99,6 +101,7 @@ impl Connector {
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Configured(configured) => Some(&mut configured.logger),
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
src/runtime/endpoint.rs
Show inline comments
 
@@ -88,8 +88,11 @@ impl From<EndpointErr> for ConnectErr {
 
}
 
impl Endpoint {
 
    // asymmetric
 
    pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
    //     Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // }
 
    pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec<u8>) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] })
 
    }
 

	
 
    // symmetric
src/runtime/mod.rs
Show inline comments
 
@@ -55,6 +55,7 @@ pub struct Configured {
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
src/runtime/setup.rs
Show inline comments
 
@@ -23,12 +23,12 @@ impl Controller {
 
        main_component: &[u8],
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 
@@ -94,11 +94,11 @@ impl Controller {
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 
        log!(logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None };
 
        let p_monos = vec![MonoP {
 
@@ -109,14 +109,14 @@ impl Controller {
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        log!(logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
@@ -125,7 +125,11 @@ impl Controller {
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
            logger: {
 
                let mut l = String::default();
 
                std::mem::swap(&mut l, logger);
 
                l
 
            },
 
        };
 
        let controller = Self {
 
            protocol_description,
 
@@ -137,9 +141,21 @@ impl Controller {
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    // with mio v0.6 attempting to read bytes into a nonempty buffer appears to
 
    // be the only reliably platform-independent means of testing the connectivity of
 
    // a mio::TcpStream (see Self::connection_testing_read).
 
    // as this unavoidably MAY read some crucial payload bytes, we have to be careful
 
    // to pass these potentially populated buffers into the Endpoint, or bytes may be lost.
 
    // This is done with Endpoint::from_fresh_stream_and_inbox.
 
    fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec<u8>) -> std::io::Result<()> {
 
        inbox.clear();
 
        use std::io::Read;
 
        match stream.read_to_end(inbox) {
 
            Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
            Ok(_) => Ok(()),
 
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
 
            Err(e) => Err(e),
 
        }
 
    }
 

	
 
    // inserts
 
@@ -189,9 +205,15 @@ impl Controller {
 
        // 4. until all in endpoint_ext_todos are Finished variant, handle events
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        // see Self::connection_testing_read for why we populate Endpoint inboxes here.
 
        let mut next_inbox = vec![];
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            ms.poll_events(deadline).map_err(|e| {
 
                log!(logger, "{:03?} timing out", major);
 
                e
 
            })?;
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
@@ -216,14 +238,16 @@ impl Controller {
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_err() {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
@@ -236,17 +260,18 @@ impl Controller {
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_ok() {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            log!(logger, "Connectivity test passed");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                    let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
src/test/mod.rs
Show inline comments
 
@@ -5,6 +5,7 @@ use core::fmt::Debug;
 
use std::net::SocketAddr;
 

	
 
mod connector;
 
mod net;
 
mod setup;
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
src/test/net.rs
Show inline comments
 
new file 100644
 
use mio::*;
 
use std::io::ErrorKind::WouldBlock;
 
use std::net::SocketAddr;
 

	
 
fn connection_testing_read(
 
    stream: &mut mio::net::TcpStream,
 
    inbox: &mut Vec<u8>,
 
) -> std::io::Result<()> {
 
    assert!(inbox.is_empty());
 
    use std::io::Read;
 
    match stream.read_to_end(inbox) {
 
        Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
        Ok(_) => Ok(()),
 
        Err(e) if e.kind() == WouldBlock => Ok(()),
 
        Err(e) => Err(e),
 
    }
 
}
 

	
 
#[test]
 
fn mio_tcp_connect_err() {
 
    let poll = Poll::new().unwrap();
 
    let mut events = Events::with_capacity(64);
 

	
 
    let addr: SocketAddr = "127.0.0.1:12000".parse().unwrap();
 
    let mut stream = mio::net::TcpStream::connect(&addr).unwrap();
 
    poll.register(&stream, Token(0), Ready::all(), PollOpt::edge()).unwrap();
 

	
 
    let mut v = vec![];
 
    loop {
 
        poll.poll(&mut events, Some(std::time::Duration::from_secs(2))).unwrap();
 
        for event in events.iter() {
 
            assert_eq!(event.token(), Token(0));
 
            println!("readiness {:?}", event.readiness());
 
            // assert_eq!(event.readiness(), Ready::writable());
 

	
 
            v.clear();
 
            println!("{:?}", connection_testing_read(&mut stream, &mut v));
 
            println!("----------- {:?}", &v);
 
            std::thread::sleep(std::time::Duration::from_secs(1));
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)