Changeset - 557148ec2325
[Not reviewed]
1 5 1
Christopher Esterhuyse - 5 years ago 2020-04-30 10:03:19
christopher.esterhuyse@gmail.com
removed accidentally pushed dead code
7 files changed with 94 insertions and 38 deletions:
0 comments (0 inline, 0 general)
src/runtime/batches.rs
Show inline comments
 
deleted file
src/runtime/connector.rs
Show inline comments
 
@@ -27,24 +27,25 @@ impl Connector {
 
            Connector::Configured(_) => return Err(AlreadyConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id,
 
        };
 
        let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?);
 
        let polarities = protocol_description.component_polarities(main_component)?;
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
            logger: "Logger created!\n".into(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
    pub fn bind_port(
 
        &mut self,
 
        proto_port_index: usize,
 
        binding: PortBinding,
 
    ) -> Result<(), PortBindErr> {
 
        use PortBindErr::*;
 
@@ -79,35 +80,37 @@ impl Connector {
 
                    .bindings
 
                    .get(&native_index)
 
                    .copied()
 
                    .ok_or(PortNotBound { native_index })?;
 
                Ok((binding, polarity))
 
            })
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            &mut configured.logger,
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Configured(configured) => Some(&mut configured.logger),
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
src/runtime/endpoint.rs
Show inline comments
 
@@ -79,26 +79,29 @@ impl CommMsgContents {
 
}
 

	
 
impl From<EndpointErr> for ConnectErr {
 
    fn from(e: EndpointErr) -> Self {
 
        match e {
 
            EndpointErr::Disconnected => ConnectErr::Disconnected,
 
            EndpointErr::MetaProtocolDeviation => ConnectErr::MetaProtocolDeviation,
 
        }
 
    }
 
}
 
impl Endpoint {
 
    // asymmetric
 
    pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self {
 
    //     Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] })
 
    // }
 
    pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec<u8>) -> Self {
 
        Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] })
 
    }
 

	
 
    // symmetric
 
    pub fn new_memory_pair() -> [Self; 2] {
 
        let (s1, r1) = mio_extras::channel::channel::<Msg>();
 
        let (s2, r2) = mio_extras::channel::channel::<Msg>();
 
        [Self::Memory { s: s1, r: r2 }, Self::Memory { s: s2, r: r1 }]
 
    }
 
    pub fn send(&mut self, msg: Msg) -> Result<(), EndpointErr> {
 
        match self {
 
            Self::Memory { s, .. } => s.send(msg).map_err(|_| EndpointErr::Disconnected),
 
            Self::Network(NetworkEndpoint { stream, outbox, .. }) => {
src/runtime/mod.rs
Show inline comments
 
@@ -46,24 +46,25 @@ pub enum Connector {
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
src/runtime/setup.rs
Show inline comments
 
@@ -14,30 +14,30 @@ enum EndpointExtTodo {
 
    PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener },
 
    PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream },
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Controller {
 
    // Given port bindings and a protocol config, create a connector with 1 native node
 
    pub fn connect(
 
        major: ControllerId,
 
        main_component: &[u8],
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 

	
 
        let mut native_interface = vec![];
 

	
 
        /*
 
        1.  - allocate an EndpointExtTodo for every native and interface port
 
@@ -85,70 +85,86 @@ impl Controller {
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 
        log!(logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 
            Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None };
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        log!(logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
            logger: {
 
                let mut l = String::default();
 
                std::mem::swap(&mut l, logger);
 
                l
 
            },
 
        };
 
        let controller = Self {
 
            protocol_description,
 
            inner,
 
            ephemeral: Default::default(),
 
            // round_histories: vec![],
 
            unrecoverable_error: None,
 
        };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    // with mio v0.6 attempting to read bytes into a nonempty buffer appears to
 
    // be the only reliably platform-independent means of testing the connectivity of
 
    // a mio::TcpStream (see Self::connection_testing_read).
 
    // as this unavoidably MAY read some crucial payload bytes, we have to be careful
 
    // to pass these potentially populated buffers into the Endpoint, or bytes may be lost.
 
    // This is done with Endpoint::from_fresh_stream_and_inbox.
 
    fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec<u8>) -> std::io::Result<()> {
 
        inbox.clear();
 
        use std::io::Read;
 
        match stream.read_to_end(inbox) {
 
            Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
            Ok(_) => Ok(()),
 
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
 
            Err(e) => Err(e),
 
        }
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
@@ -180,82 +196,91 @@ impl Controller {
 
                PassiveAccepting { listener, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(listener, token, ready_r, edge)
 
                }
 
            }
 
            .expect("register first");
 
        }
 
        // invariant: every EndpointExtTodo has one thing registered with mio
 

	
 
        // 4. until all in endpoint_ext_todos are Finished variant, handle events
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        // see Self::connection_testing_read for why we populate Endpoint inboxes here.
 
        let mut next_inbox = vec![];
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            ms.poll_events(deadline).map_err(|e| {
 
                log!(logger, "{:03?} timing out", major);
 
                e
 
            })?;
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
                        assert!(event.readiness().is_readable());
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptFailed(*addr))?;
 
                        ms.poll.deregister(listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveAccepting { addr, info, .. } = e => {
 
                                PassiveConnecting { addr, info, stream }
 
                            }]
 
                        });
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_err() {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                        if Self::connection_testing_read(stream, &mut next_inbox).is_ok() {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            log!(logger, "Connectivity test passed");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                let mut inbox = vec![];
 
                                std::mem::swap(&mut inbox, &mut next_inbox);
 
                                    let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
src/test/mod.rs
Show inline comments
 
use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 
use std::net::SocketAddr;
 

	
 
mod connector;
 
mod net;
 
mod setup;
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
src/test/net.rs
Show inline comments
 
new file 100644
 
use mio::*;
 
use std::io::ErrorKind::WouldBlock;
 
use std::net::SocketAddr;
 

	
 
fn connection_testing_read(
 
    stream: &mut mio::net::TcpStream,
 
    inbox: &mut Vec<u8>,
 
) -> std::io::Result<()> {
 
    assert!(inbox.is_empty());
 
    use std::io::Read;
 
    match stream.read_to_end(inbox) {
 
        Ok(0) => unreachable!("Ok(0) on read should return Err instead!"),
 
        Ok(_) => Ok(()),
 
        Err(e) if e.kind() == WouldBlock => Ok(()),
 
        Err(e) => Err(e),
 
    }
 
}
 

	
 
#[test]
 
fn mio_tcp_connect_err() {
 
    let poll = Poll::new().unwrap();
 
    let mut events = Events::with_capacity(64);
 

	
 
    let addr: SocketAddr = "127.0.0.1:12000".parse().unwrap();
 
    let mut stream = mio::net::TcpStream::connect(&addr).unwrap();
 
    poll.register(&stream, Token(0), Ready::all(), PollOpt::edge()).unwrap();
 

	
 
    let mut v = vec![];
 
    loop {
 
        poll.poll(&mut events, Some(std::time::Duration::from_secs(2))).unwrap();
 
        for event in events.iter() {
 
            assert_eq!(event.token(), Token(0));
 
            println!("readiness {:?}", event.readiness());
 
            // assert_eq!(event.readiness(), Ready::writable());
 

	
 
            v.clear();
 
            println!("{:?}", connection_testing_read(&mut stream, &mut v));
 
            println!("----------- {:?}", &v);
 
            std::thread::sleep(std::time::Duration::from_secs(1));
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)