Changeset - c3cc95b4ef76
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-02-21 13:08:01
christopher.esterhuyse@gmail.com
connecting nice
1 file changed with 18 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/runtime/experimental/api.rs
Show inline comments
 
@@ -65,49 +65,49 @@ pub struct Connecting {
 
    bindings: Vec<Binding>,
 
}
 
trait Binds<T> {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T;
 
}
 
impl Binds<InPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort {
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr });
 
        InPort(Port(self.bindings.len() - 1))
 
    }
 
}
 
impl Binds<OutPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort {
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr });
 
        OutPort(Port(self.bindings.len() - 1))
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum ConnectErr {
 
    BindErr(SocketAddr),
 
    NewSocketErr(SocketAddr),
 
    AcceptErr(SocketAddr),
 
    ConnectionShutdown(SocketAddr),
 
    PortKindMismatch(SocketAddr),
 
    PortKindMismatch(Port, SocketAddr),
 
    EndpointErr(Port, EndpointErr),
 
    PollInitFailed,
 
    PollingFailed,
 
    Timeout,
 
}
 

	
 
#[derive(Debug)]
 
struct Component {
 
    protocol: Arc<ProtocolD>,
 
    port_set: HashSet<Port>,
 
    identifier: Arc<[u8]>,
 
    state: ProtocolS,
 
}
 

	
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> Self {
 
        use PollDeadlineErr as P;
 
        match e {
 
            P::PollingFailed => Self::PollingFailed,
 
            P::Timeout => Self::Timeout,
 
        }
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
@@ -219,55 +219,55 @@ impl Connecting {
 
                            ms.poll.deregister(&stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let stream = TcpStream::connect(&binding.addr).unwrap();
 
                            ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3");
 
                            Todo::ActiveConnecting { stream }
 
                        };
 
                        todos[index] = Some(todo);
 
                    }
 
                    Some(Todo::PassiveConnecting { mut stream, channel_id }) => {
 
                        if !Self::test_stream_connectivity(&mut stream) {
 
                            return Err(ConnectionShutdown(binding.addr));
 
                        }
 
                        ms.poll.reregister(&stream, token, ready_r, edge).expect("55");
 
                        let polarity = binding.polarity;
 
                        let info = EndpointInfo { polarity, channel_id };
 
                        let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                        let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                        endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?;
 
                        let endpoint_ext = EndpointExt { endpoint, info };
 
                        endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                        num_todos_remaining -= 1;
 
                    }
 
                    Some(Todo::ActiveRecving { mut endpoint }) => {
 
                        let ekey = Key::from_raw(index);
 
                        let ekey = Port(index);
 
                        'recv_loop: while let Some(msg) =
 
                            endpoint.recv().map_err(|e| EndpointErr(Port(index), e))?
 
                            endpoint.recv().map_err(|e| EndpointErr(ekey, e))?
 
                        {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == binding.polarity {
 
                                    return Err(PortKindMismatch(binding.addr));
 
                                    return Err(PortKindMismatch(ekey, binding.addr));
 
                                }
 
                                let channel_id = info.channel_id;
 
                                let info = EndpointInfo { polarity: binding.polarity, channel_id };
 
                                ms.polled_undrained.insert(ekey);
 
                                let endpoint_ext = EndpointExt { endpoint, info };
 
                                endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                                num_todos_remaining -= 1;
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        assert_eq!(None, endpoint_exts.iter_reserved().next());
 
        drop(todos);
 

	
 
        ///////////////////////////////////////////////////////
 
        // 1. construct `family', i.e. perform the sink tree setup procedure
 
        use {Msg::SetupMsg as S, SetupMsg::*};
 
        let mut messenger = (&mut ms, &mut endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut VecStorage<EndpointExt>) {
 
@@ -616,42 +616,52 @@ unsafe fn c_sync_subset(
 
}
 

	
 
// dummy fn for the actual synchronous round
 
fn sync_inner<'c, 'b>(
 
    _connected: &'c mut Connected,
 
    _buf: &'b mut [u8],
 
) -> (usize, &'b HashMap<Port, Range<usize>>) {
 
    todo!()
 
}
 

	
 
unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
    std::slice::from_raw_parts_mut(ptr, len)
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
 

	
 
#[test]
 
fn api_connecting() {
 
    let addrs: [SocketAddr; 3] = [
 
        "127.0.0.1:8888".parse().unwrap(),
 
        "127.0.0.1:8889".parse().unwrap(),
 
        "127.0.0.1:8890".parse().unwrap(),
 
    ];
 
    const TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
 
    let handles = vec![
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let connected = connecting.connect(None);
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let _b: OutPort = connecting.bind(Coupling::Active, addrs[1]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("A: {:#?}", connected);
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let connected = connecting.connect(Some(Duration::from_secs(2)));
 
            let _a: InPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let _b: InPort = connecting.bind(Coupling::Passive, addrs[1]);
 
            let _c: InPort = connecting.bind(Coupling::Active, addrs[2]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("B: {:#?}", connected);
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[2]);
 
            let connected = connecting.connect(TIMEOUT);
 
            println!("C: {:#?}", connected);
 
        }),
 
    ];
 
    for h in handles {
 
        h.join().unwrap();
 
    }
 
}
0 comments (0 inline, 0 general)