Changeset - 557148ec2325
[Not reviewed]
1 5 1
Christopher Esterhuyse - 5 years ago 2020-04-30 10:03:19
christopher.esterhuyse@gmail.com
removed accidentally pushed dead code
7 files changed with 94 insertions and 38 deletions:
0 comments (0 inline, 0 general)
src/runtime/batches.rs
Show inline comments
 
deleted file
src/runtime/connector.rs
Show inline comments
 
@@ -33,12 +33,13 @@ impl Connector {
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
            logger: "Logger created!\n".into(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
@@ -85,23 +86,25 @@ impl Connector {
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            &mut configured.logger,
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Configured(configured) => Some(&mut configured.logger),
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
src/runtime/endpoint.rs
Show inline comments
 
@@ -85,14 +85,17 @@ impl From<EndpointErr> for ConnectErr {
 
            EndpointErr::MetaProtocolDeviation => ConnectErr::MetaProtocolDeviation,
 
        }
 
    }
 
}
 
impl Endpoint {
 
    // asymmetric
 
    pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
    //     Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // }
 
    pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec<u8>) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] })
 
    }
 

	
 
    // symmetric
 
    pub fn new_memory_pair() -> [Self; 2] {
 
        let (s1, r1) = mio_extras::channel::channel::<Msg>();
 
        let (s2, r2) = mio_extras::channel::channel::<Msg>();
src/runtime/mod.rs
Show inline comments
 
@@ -52,12 +52,13 @@ pub struct Unconfigured {
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
src/runtime/setup.rs
Show inline comments
 
@@ -20,18 +20,18 @@ impl Controller {
 
    // Given port bindings and a protocol config, create a connector with 1 native node
 
    pub fn connect(
 
        major: ControllerId,
 
        main_component: &[u8],
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
@@ -91,58 +91,74 @@ impl Controller {
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 
        log!(logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None };
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        log!(logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
            logger: {
 
                let mut l = String::default();
 
                std::mem::swap(&mut l, logger);
 
                l
 
            },
 
        };
 
        let controller = Self {
 
            protocol_description,
 
            inner,
 
            ephemeral: Default::default(),
 
            // round_histories: vec![],
 
            unrecoverable_error: None,
 
        };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    // with mio v0.6 attempting to read bytes into a nonempty buffer appears to
 
    // be the only reliably platform-independent means of testing the connectivity of
 
    // a mio::TcpStream (see Self::connection_testing_read).
 
    // as this unavoidably MAY read some crucial payload bytes, we have to be careful
 
    // to pass these potentially populated buffers into the Endpoint, or bytes may be lost.
 
    // This is done with Endpoint::from_fresh_stream_and_inbox.
 
    fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec<u8>) -> std::io::Result<()> {
 
        inbox.clear();
 
        use std::io::Read;
 
        match stream.read_to_end(inbox) {
 
            Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
            Ok(_) => Ok(()),
 
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
 
            Err(e) => Err(e),
 
        }
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
@@ -186,15 +202,21 @@ impl Controller {
 
        }
 
        // invariant: every EndpointExtTodo has one thing registered with mio
 

	
 
        // 4. until all in endpoint_ext_todos are Finished variant, handle events
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        // see Self::connection_testing_read for why we populate Endpoint inboxes here.
 
        let mut next_inbox = vec![];
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            ms.poll_events(deadline).map_err(|e| {
 
                log!(logger, "{:03?} timing out", major);
 
                e
 
            })?;
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
@@ -213,43 +235,46 @@ impl Controller {
 
                        });
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_err() {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_ok() {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            log!(logger, "Connectivity test passed");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                    let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
src/test/mod.rs
Show inline comments
 
@@ -2,12 +2,13 @@ use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 
use std::net::SocketAddr;
 

	
 
mod connector;
 
mod net;
 
mod setup;
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
src/test/net.rs
Show inline comments
 
new file 100644
 
use mio::*;
 
use std::io::ErrorKind::WouldBlock;
 
use std::net::SocketAddr;
 

	
 
fn connection_testing_read(
 
    stream: &mut mio::net::TcpStream,
 
    inbox: &mut Vec<u8>,
 
) -> std::io::Result<()> {
 
    assert!(inbox.is_empty());
 
    use std::io::Read;
 
    match stream.read_to_end(inbox) {
 
        Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
        Ok(_) => Ok(()),
 
        Err(e) if e.kind() == WouldBlock => Ok(()),
 
        Err(e) => Err(e),
 
    }
 
}
 

	
 
#[test]
 
fn mio_tcp_connect_err() {
 
    let poll = Poll::new().unwrap();
 
    let mut events = Events::with_capacity(64);
 

	
 
    let addr: SocketAddr = "127.0.0.1:12000".parse().unwrap();
 
    let mut stream = mio::net::TcpStream::connect(&addr).unwrap();
 
    poll.register(&stream, Token(0), Ready::all(), PollOpt::edge()).unwrap();
 

	
 
    let mut v = vec![];
 
    loop {
 
        poll.poll(&mut events, Some(std::time::Duration::from_secs(2))).unwrap();
 
        for event in events.iter() {
 
            assert_eq!(event.token(), Token(0));
 
            println!("readiness {:?}", event.readiness());
 
            // assert_eq!(event.readiness(), Ready::writable());
 

	
 
            v.clear();
 
            println!("{:?}", connection_testing_read(&mut stream, &mut v));
 
            println!("----------- {:?}", &v);
 
            std::thread::sleep(std::time::Duration::from_secs(1));
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)