Changeset - 48b0400c877f
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-06-20 15:29:23
christopher.esterhuyse@gmail.com
clearer input output wiring
1 file changed with 77 insertions and 40 deletions:
0 comments (0 inline, 0 general)
src/runtime/setup2.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
struct LogicalChannelInfo {
 
    local_port: PortId,
 
    peer_port: PortId,
 
    local_polarity: Polarity,
 
    endpoint_index: usize,
 
}
 
///////////////
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
    ) -> Self {
 
        let logger = Box::new(StringLogger::new(controller_id));
 
@@ -39,12 +46,15 @@ impl Connector {
 
    }
 
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.next_port();
 
                self.native_ports.insert(p);
 
                if endpoint_setup.polarity == Getter {
 
                    self.inp_to_route.insert(p, InpRoute::NativeComponent);
 
                }
 
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
@@ -104,18 +114,30 @@ impl Connector {
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = init_endpoints(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.inp_to_route,
 
                    deadline,
 
                )?;
 
                let mut endpoint_manager = {
 
                    let Self { outp_to_inp, inp_to_route, logger, .. } = self;
 
                    let logical_channel_callback = |lci: LogicalChannelInfo| {
 
                        if let Putter = lci.local_polarity {
 
                            outp_to_inp.insert(lci.local_port, lci.peer_port);
 
                            inp_to_route.insert(
 
                                lci.peer_port,
 
                                InpRoute::Endpoint { index: lci.endpoint_index },
 
                            );
 
                        }
 
                    };
 
                    new_endpoint_manager(
 
                        &mut **logger,
 
                        endpoint_setups,
 
                        logical_channel_callback,
 
                        deadline,
 
                    )?
 
                };
 
                log!(
 
                    self.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
@@ -135,31 +157,32 @@ impl Connector {
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn init_endpoints(
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    mut logical_channel_callback: impl FnMut(LogicalChannelInfo),
 
    deadline: Instant,
 
) -> Result<EndpointManager, ()> {
 
    ////////////////////////////////////////////
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init(
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
        let todo_endpoint = if endpoint_setup.is_active {
 
@@ -176,110 +199,125 @@ fn init_endpoints(
 
            endpoint_setup: endpoint_setup.clone(),
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
        })
 
    };
 
    ////////////////////////
 
    ////////////////////////////////////////////
 

	
 
    let mut em = EndpointManager {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
        endpoint_exts: Vec::with_capacity(endpoint_setups.len()),
 
    };
 
    // 1. Start to construct EndpointManager
 
    let mut poll = Poll::new().map_err(drop)?;
 
    let mut events = Events::with_capacity(64);
 
    let mut undrained_endpoints = IndexSet::<usize>::default();
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), *local_port, endpoint_setup, &mut em.poll)
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
 
    while !unfinished.is_empty() {
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        em.poll.poll(&mut em.events, Some(remaining)).map_err(drop)?;
 
        for event in em.events.iter() {
 
        poll.poll(&mut events, Some(remaining)).map_err(drop)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                em.poll.registry().deregister(listener).unwrap();
 
                em.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                poll.registry().deregister(listener).unwrap();
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
            }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    endpoint_setup,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                } => {
 
                    if !unfinished.contains(&index) {
 
                    if !setup_incomplete.contains(&index) {
 
                        continue;
 
                    }
 
                    if event.is_writable() && !*sent_local_port {
 
                        let msg =
 
                            MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port };
 
                        endpoint.send(&msg)?;
 
                        log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        em.undrained_endpoints.insert(index);
 
                        undrained_endpoints.insert(index);
 
                        if let Some(peer_port_info) =
 
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
 
                        {
 
                            log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info);
 
                            assert!(peer_port_info.polarity != endpoint_setup.polarity);
 
                            if let Putter = endpoint_setup.polarity {
 
                                inp_to_route.insert(*local_port, InpRoute::Endpoint { index });
 
                            }
 
                            *recv_peer_port = Some(peer_port_info.port);
 
                            let lci = LogicalChannelInfo {
 
                                local_port: *local_port,
 
                                peer_port: peer_port_info.port,
 
                                local_polarity: endpoint_setup.polarity,
 
                                endpoint_index: index,
 
                            };
 
                            logical_channel_callback(lci);
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                        unfinished.remove(&index);
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        em.events.clear();
 
        events.clear();
 
    }
 
    em.endpoint_exts.extend(todos.into_iter().map(
 
        |Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
        },
 
    ));
 
    Ok(em)
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        undrained_endpoints,
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "beginning neighborhood construction");
 
    ////////////////////////////////////////////
 
    use Msg::SetupMsg as S;
 
    use SetupMsg::*;
 
    ////////////////////////////////////////////
 

	
 
    log!(logger, "beginning neighborhood construction");
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
@@ -355,18 +393,17 @@ fn init_neighborhood(
 
        Some(parent) => assert_ne!(
 
            my_leader, controller_id,
 
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
            parent, controller_id
 
        ),
 
    }
 

	
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    // await 1 message from all non-parents
 
    //    await 1 message from all non-parents.
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
0 comments (0 inline, 0 general)