Files @ 44a98be4e4b4
Branch filter:

Location: CSY/reowolf/src/runtime/setup2.rs - annotation

44a98be4e4b4 11.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Christopher Esterhuyse
beginning large overhaul: moving to globally-unique ports & port -> endpoint route mappings
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
use crate::common::*;
use crate::runtime::*;

impl Connector {
    pub fn new_simple(
        proto_description: Arc<ProtocolDescription>,
        controller_id: ControllerId,
    ) -> Self {
        let logger = Box::new(StringLogger::new(controller_id));
        let surplus_sockets = 8;
        Self::new(logger, proto_description, controller_id, surplus_sockets)
    }
    pub fn new(
        logger: Box<dyn Logger>,
        proto_description: Arc<ProtocolDescription>,
        controller_id: ControllerId,
        surplus_sockets: u16,
    ) -> Self {
        Self {
            logger,
            proto_description,
            id_manager: IdManager::new(controller_id),
            native_ports: Default::default(),
            proto_components: Default::default(),
            outp_to_inp: Default::default(),
            inp_to_route: Default::default(),
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
        }
    }
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
        let o = self.id_manager.next_port();
        let i = self.id_manager.next_port();
        self.outp_to_inp.insert(o, i);
        self.inp_to_route.insert(i, InpRoute::NativeComponent);
        self.native_ports.insert(o);
        self.native_ports.insert(i);
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
        [o, i]
    }
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
        match &mut self.phased {
            ConnectorPhased::Setup { endpoint_setups, .. } => {
                let p = self.id_manager.next_port();
                self.native_ports.insert(p);
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
                endpoint_setups.push((p, endpoint_setup));
                Ok(p)
            }
            ConnectorPhased::Communication { .. } => Err(()),
        }
    }
    fn check_polarity(&self, port: &PortId) -> Polarity {
        if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased {
            for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() {
                if setup_port == port {
                    // special case. this port's polarity isn't reflected by
                    // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer
                    return *polarity;
                }
            }
        }
        if self.outp_to_inp.contains_key(port) {
            Polarity::Putter
        } else {
            assert!(self.inp_to_route.contains_key(port));
            Polarity::Getter
        }
    }
    pub fn add_component(
        &mut self,
        identifier: &[u8],
        ports: &[PortId],
    ) -> Result<(), AddComponentError> {
        use AddComponentError::*;
        let polarities = self.proto_description.component_polarities(identifier)?;
        if polarities.len() != ports.len() {
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
        }
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
            if !self.native_ports.contains(port) {
                return Err(UnknownPort(*port));
            }
            if expected_polarity != self.check_polarity(port) {
                return Err(WrongPortPolarity { port: *port, expected_polarity });
            }
        }
        // ok!
        let state = self.proto_description.new_main_component(identifier, ports);
        let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
        let proto_component_index = self.proto_components.len();
        self.proto_components.push(proto_component);
        for port in ports.iter() {
            if let Polarity::Getter = self.check_polarity(port) {
                self.inp_to_route
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
            }
        }
        Ok(())
    }
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
        match &mut self.phased {
            ConnectorPhased::Communication { .. } => {
                log!(self.logger, "Call to connecting in connected state");
                Err(())
            }
            ConnectorPhased::Setup { endpoint_setups, .. } => {
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
                let deadline = Instant::now() + timeout;
                // connect all endpoints in parallel; send and receive peer ids through ports
                let (mut endpoint_exts, mut endpoint_poller) = init_endpoints(
                    &mut *self.logger,
                    endpoint_setups,
                    &mut self.inp_to_route,
                    deadline,
                )?;
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
                // leader election and tree construction
                let neighborhood =
                    init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?;
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
                // TODO session optimization goes here
                self.phased = ConnectorPhased::Communication {
                    endpoint_poller,
                    endpoint_exts,
                    neighborhood,
                    mem_inbox: Default::default(),
                };
                Ok(())
            }
        }
    }
}

fn init_endpoints(
    logger: &mut dyn Logger,
    endpoint_setups: &[(PortId, EndpointSetup)],
    inp_to_route: &mut HashMap<PortId, InpRoute>,
    deadline: Instant,
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
    use mio07::{
        net::{TcpListener, TcpStream},
        Events, Interest, Poll, Token,
    };
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
    struct Todo {
        todo_endpoint: TodoEndpoint,
        endpoint_setup: EndpointSetup,
        local_port: PortId,
        sent_local_port: bool,          // true <-> I've sent my local port
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
    }
    enum TodoEndpoint {
        Listener(TcpListener),
        Endpoint(Endpoint),
    }
    fn init(
        token: Token,
        local_port: PortId,
        endpoint_setup: &EndpointSetup,
        poll: &mut Poll,
    ) -> Result<Todo, ()> {
        let todo_endpoint = if endpoint_setup.is_active {
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut stream, token, BOTH).unwrap();
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
        } else {
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut listener, token, BOTH).unwrap();
            TodoEndpoint::Listener(listener)
        };
        Ok(Todo {
            todo_endpoint,
            endpoint_setup: endpoint_setup.clone(),
            local_port,
            sent_local_port: false,
            recv_peer_port: None,
        })
    };
    ////////////////////////

    let mut ep = EndpointPoller {
        poll: Poll::new().map_err(drop)?,
        events: Events::with_capacity(64),
        undrained_endpoints: Default::default(),
        delayed_inp_messages: Default::default(),
    };

    let mut todos = endpoint_setups
        .iter()
        .enumerate()
        .map(|(index, (local_port, endpoint_setup))| {
            init(Token(index), *local_port, endpoint_setup, &mut ep.poll)
        })
        .collect::<Result<Vec<Todo>, _>>()?;

    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
    while !unfinished.is_empty() {
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
        ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?;
        for event in ep.events.iter() {
            let token = event.token();
            let Token(index) = token;
            let todo: &mut Todo = &mut todos[index];
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
                ep.poll.registry().deregister(listener).unwrap();
                ep.poll.registry().register(&mut stream, token, BOTH).unwrap();
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
                let endpoint = Endpoint { stream, inbox: vec![] };
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
            }
            match todo {
                Todo {
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
                    local_port,
                    endpoint_setup,
                    sent_local_port,
                    recv_peer_port,
                } => {
                    if !unfinished.contains(&index) {
                        continue;
                    }
                    if event.is_writable() && !*sent_local_port {
                        let msg =
                            MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port };
                        endpoint.send(&msg)?;
                        log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg);
                        *sent_local_port = true;
                    }
                    if event.is_readable() && recv_peer_port.is_none() {
                        ep.undrained_endpoints.insert(index);
                        if let Some(peer_port_info) =
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
                        {
                            log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info);
                            assert!(peer_port_info.polarity != endpoint_setup.polarity);
                            if let Putter = endpoint_setup.polarity {
                                inp_to_route.insert(*local_port, InpRoute::Endpoint { index });
                            }
                            *recv_peer_port = Some(peer_port_info.port);
                        }
                    }
                    if *sent_local_port && recv_peer_port.is_some() {
                        unfinished.remove(&index);
                        log!(logger, "endpoint[{}] is finished!", index);
                    }
                }
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
            }
        }
        ep.events.clear();
    }
    let endpoint_exts = todos
        .into_iter()
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
            endpoint: match todo_endpoint {
                TodoEndpoint::Endpoint(endpoint) => endpoint,
                TodoEndpoint::Listener(..) => unreachable!(),
            },
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
        })
        .collect();
    Ok((endpoint_exts, ep))
}

fn init_neighborhood(
    logger: &mut dyn Logger,
    endpoint_exts: &mut [EndpointExt],
    endpoint_poller: &mut EndpointPoller,
) -> Result<Neighborhood, ()> {
    log!(logger, "Time to construct my neighborhood");
    let parent = None;
    let children = Default::default();
    Ok(Neighborhood { parent, children })
}