Changeset - 83fc840cf399
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-02-21 11:05:22
christopher.esterhuyse@gmail.com
api v2
3 files changed with 126 insertions and 60 deletions:
0 comments (0 inline, 0 general)
src/runtime/errors.rs
Show inline comments
 
@@ -76,13 +76,13 @@ pub enum SyncErr {
 
pub enum EvalErr {
 
    ComponentExitWhileBranching,
 
}
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum MessengerRecvErr {
 
    PollingFailed,
 
    EndpointErr(EndpointErr),
 
    EndpointErr(Port, EndpointErr),
 
}
 
impl From<MainComponentErr> for ConfigErr {
 
    fn from(e: MainComponentErr) -> Self {
 
        use ConfigErr as C;
 
        use MainComponentErr as M;
 
        match e {
src/runtime/experimental/api.rs
Show inline comments
 
use super::vec_storage::VecStorage;
 
use crate::common::*;
 
use crate::runtime::endpoint::EndpointExt;
 
use crate::runtime::endpoint::EndpointInfo;
 
use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg};
 
use crate::runtime::errors::EndpointErr;
 
use crate::runtime::errors::MessengerRecvErr;
 
use crate::runtime::errors::PollDeadlineErr;
 
use crate::runtime::MessengerState;
 
use crate::runtime::Messengerlike;
 
use crate::runtime::ReceivedMsg;
 

	
 
use std::net::SocketAddr;
 
use std::sync::Arc;
 

	
 
pub enum Coupling {
 
    Active,
 
    Passive,
 
}
 

	
 
#[derive(Debug)]
 
struct Family {
 
    parent: Option<Port>,
 
    children: HashSet<Port>,
 
}
 

	
 
pub struct Binding {
 
@@ -36,13 +40,13 @@ impl From<InPort> for Port {
 
impl From<OutPort> for Port {
 
    fn from(x: OutPort) -> Self {
 
        x.0
 
    }
 
}
 

	
 
#[derive(Default)]
 
#[derive(Default, Debug)]
 
struct ChannelIndexStream {
 
    next: u32,
 
}
 
impl ChannelIndexStream {
 
    fn next(&mut self) -> u32 {
 
        self.next += 1;
 
@@ -71,12 +75,43 @@ impl Binds<InPort> for Connecting {
 
impl Binds<OutPort> for Connecting {
 
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort {
 
        self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr });
 
        OutPort(Port(self.bindings.len() - 1))
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum ConnectErr {
 
    BindErr(SocketAddr),
 
    NewSocketErr(SocketAddr),
 
    AcceptErr(SocketAddr),
 
    ConnectionShutdown(SocketAddr),
 
    PortKindMismatch(SocketAddr),
 
    EndpointErr(Port, EndpointErr),
 
    PollInitFailed,
 
    PollingFailed,
 
    Timeout,
 
}
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> Self {
 
        use PollDeadlineErr as P;
 
        match e {
 
            P::PollingFailed => Self::PollingFailed,
 
            P::Timeout => Self::Timeout,
 
        }
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> Self {
 
        use MessengerRecvErr as M;
 
        match e {
 
            M::PollingFailed => Self::PollingFailed,
 
            M::EndpointErr(port, err) => Self::EndpointErr(port, err),
 
        }
 
    }
 
}
 
impl Connecting {
 
    fn random_controller_id() -> ControllerId {
 
        type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe {
 
@@ -92,39 +127,26 @@ impl Connecting {
 
    }
 
    fn new_connected(
 
        &self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
        // TEMP: helper functions until Key is unified with Port
 
        #[inline]
 
        fn key2port(ekey: Key) -> Port {
 
            Port(ekey.to_raw() as usize)
 
        }
 
        #[inline]
 
        fn port2key(port: Port) -> Key {
 
            Key::from_raw(port.0)
 
        }
 
    ) -> Result<Connected, ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        // 1. bindings correspond with ports 0..bindings.len(). For each:
 
        //    - reserve a slot in endpoint_exts.
 
        //    - store the port in `native_ports' set.
 
        let mut endpoint_exts = VecStorage::<EndpointExt>::with_reserved_range(self.bindings.len());
 
        let native_ports = (0..self.bindings.len()).map(Port).collect();
 

	
 
        // 2. create MessengerState structure for polling channels
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(drop)?,
 
            events: Events::with_capacity(self.bindings.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 
        let mut ms =
 
            MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?;
 

	
 
        // 3. create one TODO task per (port,binding) as a vector with indices in lockstep.
 
        //    we will drain it gradually so we store elements of type Option<Todo> where all are initially Some(_)
 
        enum Todo {
 
            PassiveAccepting { listener: TcpListener, channel_id: ChannelId },
 
            ActiveConnecting { stream: TcpStream },
 
@@ -138,42 +160,45 @@ impl Connecting {
 
            .enumerate()
 
            .map(|(index, binding)| {
 
                Ok(Some(match binding.coupling {
 
                    Coupling::Passive => {
 
                        let channel_index = channel_index_stream.next();
 
                        let channel_id = ChannelId { controller_id, channel_index };
 
                        let listener = TcpListener::bind(&binding.addr).map_err(drop)?;
 
                        let listener =
 
                            TcpListener::bind(&binding.addr).map_err(|_| BindErr(binding.addr))?;
 
                        ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique
 
                        Todo::PassiveAccepting { listener, channel_id }
 
                    }
 
                    Coupling::Active => {
 
                        let stream = TcpStream::connect(&binding.addr).map_err(drop)?;
 
                        let stream = TcpStream::connect(&binding.addr)
 
                            .map_err(|_| NewSocketErr(binding.addr))?;
 
                        ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique
 
                        Todo::ActiveConnecting { stream }
 
                    }
 
                }))
 
            })
 
            .collect::<Result<Vec<Option<Todo>>, ()>>()?;
 
            .collect::<Result<Vec<Option<Todo>>, ConnectErr>>()?;
 
        let mut num_todos_remaining = todos.len();
 

	
 
        // 4. handle incoming events until all TODOs are completed OR we timeout
 
        let deadline = timeout.map(|t| Instant::now() + t);
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        while num_todos_remaining > 0 {
 
            ms.poll_events_until(deadline).map_err(drop)?;
 
            ms.poll_events_until(deadline)?;
 
            for event in ms.events.iter() {
 
                let token = event.token();
 
                let index = token.0;
 
                let binding = &self.bindings[index];
 
                match todos[index].take() {
 
                    None => {
 
                        polled_undrained_later.insert(index);
 
                    }
 
                    Some(Todo::PassiveAccepting { listener, channel_id }) => {
 
                        let (stream, _peer_addr) = listener.accept().map_err(drop)?;
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptErr(binding.addr))?;
 
                        ms.poll.deregister(&listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        todos[index] = Some(Todo::PassiveConnecting { stream, channel_id });
 
                    }
 
                    Some(Todo::ActiveConnecting { mut stream }) => {
 
                        let todo = if Self::test_stream_connectivity(&mut stream) {
 
@@ -189,32 +214,32 @@ impl Connecting {
 
                            Todo::ActiveConnecting { stream }
 
                        };
 
                        todos[index] = Some(todo);
 
                    }
 
                    Some(Todo::PassiveConnecting { mut stream, channel_id }) => {
 
                        if !Self::test_stream_connectivity(&mut stream) {
 
                            return Err(());
 
                            return Err(ConnectionShutdown(binding.addr));
 
                        }
 
                        ms.poll.reregister(&stream, token, ready_r, edge).expect("55");
 
                        let polarity = binding.polarity;
 
                        let info = EndpointInfo { polarity, channel_id };
 
                        let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                        let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                        endpoint.send(msg).map_err(drop)?;
 
                        endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?;
 
                        let endpoint_ext = EndpointExt { endpoint, info };
 
                        endpoint_exts.occupy_reserved(index, endpoint_ext);
 
                        num_todos_remaining -= 1;
 
                    }
 
                    Some(Todo::ActiveRecving { mut endpoint }) => {
 
                        // log!(logger, "{:03?} start ActiveRecving...", major);
 
                        // assert!(event.readiness().is_readable());
 
                        let ekey = Key::from_raw(index);
 
                        'recv_loop: while let Some(msg) = endpoint.recv().map_err(drop)? {
 
                        'recv_loop: while let Some(msg) =
 
                            endpoint.recv().map_err(|e| EndpointErr(Port(index), e))?
 
                        {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == binding.polarity {
 
                                    return Err(());
 
                                    return Err(PortKindMismatch(binding.addr));
 
                                }
 
                                let channel_id = info.channel_id;
 
                                let info = EndpointInfo { polarity: binding.polarity, channel_id };
 
                                ms.polled_undrained.insert(ekey);
 
                                let endpoint_ext = EndpointExt { endpoint, info };
 
                                endpoint_exts.occupy_reserved(index, endpoint_ext);
 
@@ -229,13 +254,12 @@ impl Connecting {
 
            }
 
        }
 
        assert_eq!(None, endpoint_exts.iter_reserved().next());
 
        drop(todos);
 

	
 
        // 1. construct `family', i.e. perform the sink tree setup procedure
 

	
 
        use {Msg::SetupMsg as S, SetupMsg::*};
 
        let mut messenger = (&mut ms, &mut endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut VecStorage<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
@@ -250,25 +274,23 @@ impl Connecting {
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let neighbors = (0..self.bindings.len()).map(Port);
 
        let echo = S(LeaderEcho { maybe_leader: controller_id });
 
        let mut awaiting = IndexSet::<Port>::with_capacity(neighbors.len());
 
        for n in neighbors.clone() {
 
            messenger.send(port2key(n), echo.clone()).map_err(drop)?;
 
            messenger.send(n, echo.clone()).map_err(|e| EndpointErr(n, e))?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Port> = None;
 
        let mut my_leader = controller_id;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } =
 
                messenger.recv_until(deadline).map_err(drop)?.ok_or(())?;
 
            let recipient = key2port(recipient);
 
            let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?;
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
@@ -282,14 +304,14 @@ impl Connecting {
 
                        Equal => {
 
                            awaiting.remove(&recipient);
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger
 
                                        .send(port2key(p), S(LeaderEcho { maybe_leader }))
 
                                        .map_err(drop)?;
 
                                        .send(p, S(LeaderEcho { maybe_leader }))
 
                                        .map_err(|e| EndpointErr(p, e))?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
@@ -298,25 +320,29 @@ impl Connecting {
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                messenger.send(port2key(recipient), echo.clone()).map_err(drop)?;
 
                                messenger
 
                                    .send(recipient, echo.clone())
 
                                    .map_err(|e| EndpointErr(recipient, e))?;
 
                            } else {
 
                                for n in neighbors.clone() {
 
                                    if n != recipient {
 
                                        messenger.send(port2key(n), echo.clone()).map_err(drop)?;
 
                                        messenger
 
                                            .send(n, echo.clone())
 
                                            .map_err(|e| EndpointErr(n, e))?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }),
 
                msg => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, controller_id,
 
                "I've got no parent, but I consider {:?} the leader?",
 
@@ -332,39 +358,38 @@ impl Connecting {
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for n in neighbors.clone() {
 
            let msg =
 
                if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            messenger.send(port2key(n), msg).map_err(drop)?;
 
            messenger.send(n, msg).map_err(|e| EndpointErr(n, e))?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for n in neighbors.clone() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = HashSet::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
 
            let ReceivedMsg { recipient, msg } =
 
                messenger.recv_until(deadline).map_err(drop)?.ok_or(())?;
 
            let recipient = key2port(recipient);
 
            let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?;
 
            let recipient = recipient;
 
            match msg {
 
                S(YouAreMyParent) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    children.insert(recipient);
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }),
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        let family = Family { parent, children };
 

	
 
        // 1. done! return
 
        Ok(Connected {
 
@@ -379,46 +404,42 @@ impl Connecting {
 
    /////////
 
    pub fn connect_using_id(
 
        &mut self,
 
        controller_id: ControllerId,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
    ) -> Result<Connected, ConnectErr> {
 
        // 1. try and create a connection from these bindings with self immutable.
 
        let connected = self.new_connected(controller_id, protocol, timeout)?;
 
        // 2. success! drain self and return
 
        self.bindings.clear();
 
        Ok(connected)
 
    }
 
    pub fn connect(
 
        &mut self,
 
        protocol: &Arc<Protocol>,
 
        timeout: Option<Duration>,
 
    ) -> Result<Connected, ()> {
 
    ) -> Result<Connected, ConnectErr> {
 
        self.connect_using_id(Self::random_controller_id(), protocol, timeout)
 
    }
 
}
 
#[derive(Debug)]
 
pub struct Protocol;
 
impl Protocol {
 
    pub fn parse(_pdl_text: &[u8]) -> Result<Self, ()> {
 
        Ok(Protocol)
 
    }
 
}
 
// struct ComponentExt {
 
//     protocol: Arc<Protocol>,
 
//     ports: HashSet<Port>,
 
//     name: Vec<u8>,
 
// }
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_ports: HashSet<Port>,
 
    controller_id: ControllerId,
 
    channel_index_stream: ChannelIndexStream,
 
    endpoint_exts: VecStorage<EndpointExt>,
 
    protocol: Arc<Protocol>,
 
    family: Family,
 
    // components: Vec<ComponentExt>,
 
}
 
impl Connected {
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
@@ -564,6 +585,34 @@ fn sync_inner<'c, 'b>(
 
unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
    std::slice::from_raw_parts_mut(ptr, len)
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
 

	
 
#[test]
 
fn api_connecting() {
 
    let addrs: [SocketAddr; 3] = [
 
        "127.0.0.1:8888".parse().unwrap(),
 
        "127.0.0.1:8889".parse().unwrap(),
 
        "127.0.0.1:8890".parse().unwrap(),
 
    ];
 
    let protocol1 = Arc::new(Protocol::parse(b"").unwrap());
 
    let protocol2 = protocol1.clone();
 
    let handles = vec![
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]);
 
            let connected = connecting.connect(&protocol1, None);
 
            println!("A: {:#?}", connected);
 
        }),
 
        std::thread::spawn(move || {
 
            let mut connecting = Connecting::default();
 
            let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]);
 
            let connected = connecting.connect(&protocol2, Some(Duration::from_secs(2)));
 
            println!("B: {:#?}", connected);
 
        }),
 
    ];
 
    for h in handles {
 
        h.join().unwrap();
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -204,13 +204,17 @@ trait Messengerlike {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self.get_endpoint_mut(eekey).recv()? {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eekey)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                }
 
            }
 

	
 
@@ -237,13 +241,17 @@ trait Messengerlike {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self.get_endpoint_mut(eekey).recv()? {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eekey)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                }
 
            }
 

	
 
@@ -284,17 +292,17 @@ impl From<MessengerRecvErr> for SyncErr {
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> ConnectErr {
 
        ConnectErr::MessengerRecvErr(e)
 
    }
 
}
 
impl From<EndpointErr> for MessengerRecvErr {
 
    fn from(e: EndpointErr) -> MessengerRecvErr {
 
        MessengerRecvErr::EndpointErr(e)
 
    }
 
}
 
// impl From<EndpointErr> for MessengerRecvErr {
 
//     fn from(e: EndpointErr) -> MessengerRecvErr {
 
//         MessengerRecvErr::EndpointErr(e)
 
//     }
 
// }
 
impl<T> Default for Arena<T> {
 
    fn default() -> Self {
 
        Self { storage: vec![] }
 
    }
 
}
 
impl<T> Arena<T> {
 
@@ -330,12 +338,21 @@ impl ChannelIdStream {
 
        self.next_channel_index += 1;
 
        ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
    }
 
}
 

	
 
impl MessengerState {
 
    fn with_event_capacity(event_capacity: usize) -> Result<Self, std::io::Error> {
 
        Ok(Self {
 
            poll: Poll::new()?,
 
            events: Events::with_capacity(event_capacity),
 
            delayed: Default::default(),
 
            undelayed: Default::default(),
 
            polled_undrained: Default::default(),
 
        })
 
    }
 
    // does NOT guarantee that events is non-empty
 
    fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
        self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
0 comments (0 inline, 0 general)