Changeset - 10ae3589084a
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-06-20 14:51:05
christopher.esterhuyse@gmail.com
defined endpoint manager; made explicit its invariants
2 files changed with 50 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/runtime/mod.rs
Show inline comments
 
@@ -110,12 +110,16 @@ pub struct MemInMsg {
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct EndpointPoller {
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
@@ -135,8 +139,7 @@ pub enum ConnectorPhased {
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        endpoint_poller: EndpointPoller,
 
        endpoint_exts: Vec<EndpointExt>,
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
    },
 
@@ -173,12 +176,11 @@ enum TryRecyAnyError {
 
    BrokenEndpoint(usize),
 
}
 
////////////////
 
impl EndpointPoller {
 
    fn try_recv_any(
 
        &mut self,
 
        endpoint_exts: &mut [EndpointExt],
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
@@ -186,7 +188,7 @@ impl EndpointPoller {
 
        }
 
        // 2. try read from sockets nonblocking
 
        while let Some(index) = self.undrained_endpoints.pop() {
 
            if let Some(msg) = endpoint_exts[index]
 
            if let Some(msg) = self.endpoint_exts[index]
 
                .endpoint
 
                .try_recv()
 
                .map_err(|error| EndpointRecvErr { error, index })?
 
@@ -200,7 +202,7 @@ impl EndpointPoller {
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                if let Some(msg) = endpoint_exts[index]
 
                if let Some(msg) = self.endpoint_exts[index]
 
                    .endpoint
 
                    .try_recv()
 
                    .map_err(|error| EndpointRecvErr { error, index })?
src/runtime/setup2.rs
Show inline comments
 
@@ -107,26 +107,28 @@ impl Connector {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_exts, mut endpoint_poller) = init_endpoints(
 
                let mut endpoint_manager = init_endpoints(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.inp_to_route,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
 
                log!(
 
                    self.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_exts,
 
                    &mut endpoint_poller,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    endpoint_poller,
 
                    endpoint_exts,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                };
 
@@ -141,7 +143,7 @@ fn init_endpoints(
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    deadline: Instant,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
) -> Result<EndpointManager, ()> {
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
@@ -179,34 +181,35 @@ fn init_endpoints(
 
    };
 
    ////////////////////////
 

	
 
    let mut ep = EndpointPoller {
 
    let mut em = EndpointManager {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
        endpoint_exts: Vec::with_capacity(endpoint_setups.len()),
 
    };
 

	
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), *local_port, endpoint_setup, &mut ep.poll)
 
            init(Token(index), *local_port, endpoint_setup, &mut em.poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
 
    while !unfinished.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?;
 
        for event in ep.events.iter() {
 
        em.poll.poll(&mut em.events, Some(remaining)).map_err(drop)?;
 
        for event in em.events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                ep.poll.registry().deregister(listener).unwrap();
 
                ep.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                em.poll.registry().deregister(listener).unwrap();
 
                em.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
@@ -230,7 +233,7 @@ fn init_endpoints(
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        ep.undrained_endpoints.insert(index);
 
                        em.undrained_endpoints.insert(index);
 
                        if let Some(peer_port_info) =
 
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
 
                        {
 
@@ -250,26 +253,24 @@ fn init_endpoints(
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        ep.events.clear();
 
        em.events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
    em.endpoint_exts.extend(todos.into_iter().map(
 
        |Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
        })
 
        .collect();
 
    Ok((endpoint_exts, ep))
 
        },
 
    ));
 
    Ok(em)
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    endpoint_exts: &mut [EndpointExt],
 
    ep: &mut EndpointPoller,
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "beginning neighborhood construction");
 
@@ -278,8 +279,8 @@ fn init_neighborhood(
 

	
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(endpoint_exts.len());
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        awaiting.insert(index);
 
@@ -289,9 +290,9 @@ fn init_neighborhood(
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    ep.undelay_all();
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
@@ -311,7 +312,7 @@ fn init_neighborhood(
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?;
 
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // DECIDE!
 
                                break 'echo_loop;
 
@@ -325,12 +326,12 @@ fn init_neighborhood(
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if endpoint_exts.len() == 1 {
 
                        if em.endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            endpoint_exts[index].endpoint.send(&echo)?;
 
                            em.send_to(index, &echo)?;
 
                        } else {
 
                            for (index2, ee) in endpoint_exts.iter_mut().enumerate() {
 
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    continue;
 
                                }
 
@@ -342,7 +343,7 @@ fn init_neighborhood(
 
                    }
 
                }
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    match parent {
 
@@ -364,7 +365,7 @@ fn init_neighborhood(
 
    //    in this loop, every node sends 1 message to each neighbor
 
    // await 1 message from all non-parents
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
@@ -375,9 +376,9 @@ fn init_neighborhood(
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    ep.undelay_all();
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
@@ -389,7 +390,7 @@ fn init_neighborhood(
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
            inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    children.sort();
0 comments (0 inline, 0 general)