Files @ e7b7d53e6952
Branch filter:

Location: CSY/reowolf/src/runtime/setup.rs - annotation

e7b7d53e6952 16.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Christopher Esterhuyse
more tests
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
e7b7d53e6952
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
8642f7a7bf01
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
44a98be4e4b4
cecf94fdb875
cecf94fdb875
cecf94fdb875
a34c55297ac2
a34c55297ac2
a34c55297ac2
a34c55297ac2
a34c55297ac2
a34c55297ac2
a34c55297ac2
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
842acacee86d
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
10ae3589084a
10ae3589084a
10ae3589084a
10ae3589084a
10ae3589084a
44a98be4e4b4
1bacc6467d19
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
1bacc6467d19
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
10ae3589084a
44a98be4e4b4
44a98be4e4b4
8642f7a7bf01
8642f7a7bf01
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
10ae3589084a
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
48b0400c877f
44a98be4e4b4
48b0400c877f
48b0400c877f
48b0400c877f
cecf94fdb875
cecf94fdb875
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
48b0400c877f
48b0400c877f
48b0400c877f
48b0400c877f
48b0400c877f
44a98be4e4b4
48b0400c877f
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
48b0400c877f
cecf94fdb875
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
44a98be4e4b4
cecf94fdb875
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
48b0400c877f
44a98be4e4b4
48b0400c877f
48b0400c877f
842acacee86d
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
842acacee86d
48b0400c877f
48b0400c877f
48b0400c877f
48b0400c877f
48b0400c877f
cecf94fdb875
cecf94fdb875
48b0400c877f
48b0400c877f
48b0400c877f
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
1bacc6467d19
44a98be4e4b4
10ae3589084a
1bacc6467d19
44a98be4e4b4
cecf94fdb875
48b0400c877f
1bacc6467d19
1bacc6467d19
10ae3589084a
10ae3589084a
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
8642f7a7bf01
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
8642f7a7bf01
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
8642f7a7bf01
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
10ae3589084a
1bacc6467d19
8642f7a7bf01
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
48b0400c877f
1bacc6467d19
10ae3589084a
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
10ae3589084a
1bacc6467d19
cecf94fdb875
8642f7a7bf01
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
cecf94fdb875
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
1bacc6467d19
1bacc6467d19
1bacc6467d19
1bacc6467d19
44a98be4e4b4
44a98be4e4b4
use crate::common::*;
use crate::runtime::*;

impl Connector {
    pub fn new_simple(
        proto_description: Arc<ProtocolDescription>,
        controller_id: ControllerId,
    ) -> Self {
        let logger = Box::new(StringLogger::new(controller_id));
        // let logger = Box::new(DummyLogger);
        let surplus_sockets = 8;
        Self::new(logger, proto_description, controller_id, surplus_sockets)
    }
    pub fn new(
        logger: Box<dyn Logger>,
        proto_description: Arc<ProtocolDescription>,
        controller_id: ControllerId,
        surplus_sockets: u16,
    ) -> Self {
        Self {
            proto_description,
            proto_components: Default::default(),
            logger,
            id_manager: IdManager::new(controller_id),
            native_ports: Default::default(),
            port_info: Default::default(),
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
        }
    }
    pub fn new_net_port(
        &mut self,
        polarity: Polarity,
        endpoint_setup: EndpointSetup,
    ) -> Result<PortId, ()> {
        match &mut self.phased {
            ConnectorPhased::Setup { endpoint_setups, .. } => {
                let p = self.id_manager.new_port_id();
                self.native_ports.insert(p);
                // {polarity, route} known. {peer} unknown.
                self.port_info.polarities.insert(p, polarity);
                self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
                log!(
                    self.logger,
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
                    p,
                    polarity,
                    &endpoint_setup
                );
                endpoint_setups.push((p, endpoint_setup));
                Ok(p)
            }
            ConnectorPhased::Communication { .. } => Err(()),
        }
    }
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
        match &mut self.phased {
            ConnectorPhased::Communication { .. } => {
                log!(self.logger, "Call to connecting in connected state");
                Err(())
            }
            ConnectorPhased::Setup { endpoint_setups, .. } => {
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
                let deadline = Instant::now() + timeout;
                // connect all endpoints in parallel; send and receive peer ids through ports
                let mut endpoint_manager = new_endpoint_manager(
                    &mut *self.logger,
                    endpoint_setups,
                    &mut self.port_info,
                    deadline,
                )?;
                log!(
                    self.logger,
                    "Successfully connected {} endpoints",
                    endpoint_manager.endpoint_exts.len()
                );
                // leader election and tree construction
                let neighborhood = init_neighborhood(
                    self.id_manager.controller_id,
                    &mut *self.logger,
                    &mut endpoint_manager,
                    deadline,
                )?;
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
                // TODO session optimization goes here
                self.phased = ConnectorPhased::Communication {
                    round_index: 0,
                    endpoint_manager,
                    neighborhood,
                    mem_inbox: Default::default(),
                    native_batches: vec![Default::default()],
                    round_result: Ok(None),
                };
                Ok(())
            }
        }
    }
}

fn new_endpoint_manager(
    logger: &mut dyn Logger,
    endpoint_setups: &[(PortId, EndpointSetup)],
    port_info: &mut PortInfo,
    deadline: Instant,
) -> Result<EndpointManager, ()> {
    ////////////////////////////////////////////
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
    struct Todo {
        todo_endpoint: TodoEndpoint,
        local_port: PortId,
        sent_local_port: bool,          // true <-> I've sent my local port
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
    }
    enum TodoEndpoint {
        Listener(TcpListener),
        Endpoint(Endpoint),
    }
    fn init_todo(
        token: Token,
        local_port: PortId,
        endpoint_setup: &EndpointSetup,
        poll: &mut Poll,
    ) -> Result<Todo, ()> {
        let todo_endpoint = if endpoint_setup.is_active {
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut stream, token, BOTH).unwrap();
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
        } else {
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut listener, token, BOTH).unwrap();
            TodoEndpoint::Listener(listener)
        };
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
    };
    ////////////////////////////////////////////

    // 1. Start to construct EndpointManager
    let mut poll = Poll::new().map_err(drop)?;
    let mut events = Events::with_capacity(64);
    let mut polled_undrained = IndexSet::<usize>::default();
    let mut delayed_messages = vec![];

    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
    let mut todos = endpoint_setups
        .iter()
        .enumerate()
        .map(|(index, (local_port, endpoint_setup))| {
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
        })
        .collect::<Result<Vec<Todo>, _>>()?;

    // 3. Using poll to drive progress:
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
    //    - for each endpoint, send the local PortId
    //    - for each endpoint, recv the peer's PortId, and
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
    while !setup_incomplete.is_empty() {
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
        poll.poll(&mut events, Some(remaining)).map_err(drop)?;
        for event in events.iter() {
            let token = event.token();
            let Token(index) = token;
            let todo: &mut Todo = &mut todos[index];
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
                poll.registry().deregister(listener).unwrap();
                poll.registry().register(&mut stream, token, BOTH).unwrap();
                log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr);
                let endpoint = Endpoint { stream, inbox: vec![] };
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
            }
            match todo {
                Todo {
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
                    local_port,
                    sent_local_port,
                    recv_peer_port,
                    ..
                } => {
                    if !setup_incomplete.contains(&index) {
                        continue;
                    }
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
                    if event.is_writable() && !*sent_local_port {
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
                            polarity: local_polarity,
                            port: *local_port,
                        }));
                        endpoint.send(&msg)?;
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
                        *sent_local_port = true;
                    }
                    if event.is_readable() && recv_peer_port.is_none() {
                        let maybe_msg = endpoint.try_recv().map_err(drop)?;
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
                            polled_undrained.insert(index);
                        }
                        match maybe_msg {
                            None => {} // msg deserialization incomplete
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
                                assert!(peer_info.polarity != local_polarity);
                                *recv_peer_port = Some(peer_info.port);
                                // 1. finally learned the peer of this port!
                                port_info.peers.insert(*local_port, peer_info.port);
                                // 2. learned the info of this peer port
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
                                port_info.peers.insert(peer_info.port, *local_port);
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
                            }
                            Some(inappropriate_msg) => {
                                log!(
                                    logger,
                                    "delaying msg {:?} during channel setup phase",
                                    inappropriate_msg
                                );
                                delayed_messages.push((index, inappropriate_msg));
                            }
                        }
                    }
                    if *sent_local_port && recv_peer_port.is_some() {
                        setup_incomplete.remove(&index);
                        log!(logger, "endpoint[{}] is finished!", index);
                    }
                }
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
            }
        }
        events.clear();
    }
    let endpoint_exts = todos
        .into_iter()
        .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt {
            endpoint: match todo_endpoint {
                TodoEndpoint::Endpoint(endpoint) => endpoint,
                TodoEndpoint::Listener(..) => unreachable!(),
            },
            getter_for_incoming: local_port,
        })
        .collect();
    Ok(EndpointManager {
        poll,
        events,
        polled_undrained,
        undelayed_messages: delayed_messages, // no longer delayed
        delayed_messages: Default::default(),
        endpoint_exts,
    })
}

fn init_neighborhood(
    controller_id: ControllerId,
    logger: &mut dyn Logger,
    em: &mut EndpointManager,
    deadline: Instant,
) -> Result<Neighborhood, ()> {
    use {Msg::SetupMsg as S, SetupMsg::*};
    log!(logger, "beginning neighborhood construction");
    // 1. broadcast my ID as the first echo. await reply from all neighbors
    let echo = S(LeaderEcho { maybe_leader: controller_id });
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
        ee.endpoint.send(&echo)?;
        awaiting.insert(index);
    }

    // 2. Receive incoming replies. whenever a higher-id echo arrives,
    //    adopt it as leader, sender as parent, and reset the await set.
    let mut parent: Option<usize> = None;
    let mut my_leader = controller_id;
    em.undelay_all();
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
        match msg {
            S(LeaderAnnounce { leader }) => {
                // someone else completed the echo and became leader first!
                // the sender is my parent
                parent = Some(index);
                my_leader = leader;
                awaiting.clear();
                break 'echo_loop;
            }
            S(LeaderEcho { maybe_leader }) => {
                use Ordering::*;
                match maybe_leader.cmp(&my_leader) {
                    Less => { /* ignore this wave */ }
                    Equal => {
                        awaiting.remove(&index);
                        if awaiting.is_empty() {
                            if let Some(p) = parent {
                                // return the echo to my parent
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
                            } else {
                                // wave completed!
                                break 'echo_loop;
                            }
                        }
                    }
                    Greater => {
                        // join new echo
                        log!(logger, "Setting leader to index {:?}", index);
                        parent = Some(index);
                        my_leader = maybe_leader;
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
                        awaiting.clear();
                        if em.endpoint_exts.len() == 1 {
                            // immediately reply to parent
                            log!(logger, "replying echo to parent {:?} immediately", index);
                            em.send_to(index, &echo)?;
                        } else {
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
                                if index2 == index {
                                    // don't propagate echo to my parent
                                    continue;
                                }
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
                                ee.endpoint.send(&echo)?;
                                awaiting.insert(index2);
                            }
                        }
                    }
                }
            }
            inappropriate_msg => {
                log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg);
                em.delayed_messages.push((index, inappropriate_msg))
            }
        }
    }
    match parent {
        None => assert_eq!(
            my_leader, controller_id,
            "I've got no parent, but I consider {:?} the leader?",
            my_leader
        ),
        Some(parent) => assert_ne!(
            my_leader, controller_id,
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
            parent, controller_id
        ),
    }
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);

    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
    //    in this loop, every node sends 1 message to each neighbor
    //    await 1 message from all non-parents.
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
        let msg = if Some(index) == parent {
            &S(YouAreMyParent)
        } else {
            awaiting.insert(index);
            &msg_for_non_parents
        };
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
        ee.endpoint.send(msg)?;
    }
    let mut children = Vec::default();
    em.undelay_all();
    while !awaiting.is_empty() {
        log!(logger, "awaiting {:?}", &awaiting);
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
        match msg {
            S(YouAreMyParent) => {
                assert!(awaiting.remove(&index));
                children.push(index);
            }
            S(LeaderAnnounce { leader }) => {
                assert!(awaiting.remove(&index));
                assert!(leader == my_leader);
                assert!(Some(index) != parent);
                // they wouldn't send me this if they considered me their parent
            }
            inappropriate_msg => {
                log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg);
                em.delayed_messages.push((index, inappropriate_msg));
            }
        }
    }
    children.sort();
    children.dedup();
    Ok(Neighborhood { parent, children })
}