Changeset - 10ae3589084a
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-06-20 14:51:05
christopher.esterhuyse@gmail.com
defined endpoint manager; made explicit its invariants
2 files changed with 50 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/runtime/mod.rs
Show inline comments
 
@@ -107,18 +107,22 @@ pub struct Neighborhood {
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct EndpointPoller {
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    logger: Box<dyn Logger>,
 
    proto_description: Arc<ProtocolDescription>,
 
    id_manager: IdManager,
 
@@ -132,14 +136,13 @@ pub struct Connector {
 
pub enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        endpoint_poller: EndpointPoller,
 
        endpoint_exts: Vec<EndpointExt>,
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
@@ -170,26 +173,25 @@ enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointRecvErr { error: EndpointRecvErr, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
////////////////
 
impl EndpointPoller {
 
    fn try_recv_any(
 
        &mut self,
 
        endpoint_exts: &mut [EndpointExt],
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        // 2. try read from sockets nonblocking
 
        while let Some(index) = self.undrained_endpoints.pop() {
 
            if let Some(msg) = endpoint_exts[index]
 
            if let Some(msg) = self.endpoint_exts[index]
 
                .endpoint
 
                .try_recv()
 
                .map_err(|error| EndpointRecvErr { error, index })?
 
            {
 
                return Ok((index, msg));
 
            }
 
@@ -197,13 +199,13 @@ impl EndpointPoller {
 
        // 3. poll for progress
 
        loop {
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                if let Some(msg) = endpoint_exts[index]
 
                if let Some(msg) = self.endpoint_exts[index]
 
                    .endpoint
 
                    .try_recv()
 
                    .map_err(|error| EndpointRecvErr { error, index })?
 
                {
 
                    return Ok((index, msg));
 
                }
src/runtime/setup2.rs
Show inline comments
 
@@ -104,32 +104,34 @@ impl Connector {
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_exts, mut endpoint_poller) = init_endpoints(
 
                let mut endpoint_manager = init_endpoints(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.inp_to_route,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
 
                log!(
 
                    self.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_exts,
 
                    &mut endpoint_poller,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    endpoint_poller,
 
                    endpoint_exts,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                };
 
                Ok(())
 
            }
 
        }
 
@@ -138,13 +140,13 @@ impl Connector {
 

	
 
fn init_endpoints(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    deadline: Instant,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
) -> Result<EndpointManager, ()> {
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
@@ -176,40 +178,41 @@ fn init_endpoints(
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
        })
 
    };
 
    ////////////////////////
 

	
 
    let mut ep = EndpointPoller {
 
    let mut em = EndpointManager {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
        endpoint_exts: Vec::with_capacity(endpoint_setups.len()),
 
    };
 

	
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), *local_port, endpoint_setup, &mut ep.poll)
 
            init(Token(index), *local_port, endpoint_setup, &mut em.poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
 
    while !unfinished.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?;
 
        for event in ep.events.iter() {
 
        em.poll.poll(&mut em.events, Some(remaining)).map_err(drop)?;
 
        for event in em.events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                ep.poll.registry().deregister(listener).unwrap();
 
                ep.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                em.poll.registry().deregister(listener).unwrap();
 
                em.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
            }
 
            match todo {
 
                Todo {
 
@@ -227,13 +230,13 @@ fn init_endpoints(
 
                            MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port };
 
                        endpoint.send(&msg)?;
 
                        log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        ep.undrained_endpoints.insert(index);
 
                        em.undrained_endpoints.insert(index);
 
                        if let Some(peer_port_info) =
 
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
 
                        {
 
                            log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info);
 
                            assert!(peer_port_info.polarity != endpoint_setup.polarity);
 
                            if let Putter = endpoint_setup.polarity {
 
@@ -247,54 +250,52 @@ fn init_endpoints(
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        ep.events.clear();
 
        em.events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
    em.endpoint_exts.extend(todos.into_iter().map(
 
        |Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
        })
 
        .collect();
 
    Ok((endpoint_exts, ep))
 
        },
 
    ));
 
    Ok(em)
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    endpoint_exts: &mut [EndpointExt],
 
    ep: &mut EndpointPoller,
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "beginning neighborhood construction");
 
    use Msg::SetupMsg as S;
 
    use SetupMsg::*;
 

	
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(endpoint_exts.len());
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        awaiting.insert(index);
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    ep.undelay_all();
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
@@ -308,13 +309,13 @@ fn init_neighborhood(
 
                    Less => { /* ignore */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?;
 
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // DECIDE!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
                    }
 
@@ -322,30 +323,30 @@ fn init_neighborhood(
 
                        // join new echo
 
                        log!(logger, "Setting leader to index {:?}", index);
 
                        parent = Some(index);
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if endpoint_exts.len() == 1 {
 
                        if em.endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            endpoint_exts[index].endpoint.send(&echo)?;
 
                            em.send_to(index, &echo)?;
 
                        } else {
 
                            for (index2, ee) in endpoint_exts.iter_mut().enumerate() {
 
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                ee.endpoint.send(&echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    match parent {
 
        None => assert_eq!(
 
            my_leader, controller_id,
 
            "I've got no parent, but I consider {:?} the leader?",
 
@@ -361,38 +362,38 @@ fn init_neighborhood(
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    // await 1 message from all non-parents
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    ep.undelay_all();
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(SetupMsg::LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    children.sort();
 
    children.dedup();
 
    Ok(Neighborhood { parent, children })
 
}
0 comments (0 inline, 0 general)