Files @ 4776bcc45b33
Branch filter:

Location: CSY/reowolf/src/runtime/v2.rs - annotation

4776bcc45b33 8.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Christopher Esterhuyse
serde for protocol {component state, description} + start of v2 connector internals
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
4776bcc45b33
use crate::common::*;
use crate::runtime::endpoint::Endpoint;
use crate::runtime::endpoint::Msg;
use crate::runtime::ProtocolD;
use crate::runtime::ProtocolS;
use std::io::Write;

#[derive(Default)]
struct IntStream {
    next: u32,
}
struct IdManager {
    controller_id: ControllerId,
    port_suffix_stream: IntStream,
}

struct ProtoComponent {
    state: ProtocolS,
    ports: HashSet<PortId>,
}
enum InpRoute {
    NativeComponent,
    ProtoComponent { index: usize },
    Endpoint { index: usize },
}
trait Logger {
    fn line_writer(&mut self) -> &mut dyn Write;
}
#[derive(Clone)]
struct EndpointSetup {
    polarity: Polarity,
    sock_addr: SocketAddr,
    is_active: bool,
}
struct EndpointExt {
    net_endpoint: Endpoint,
    // data-messages emerging from this endpoint are destined for this inp
    inp: Port,
}
struct Neighborhood {
    parent: Option<usize>,
    children: Vec<usize>, // ordered, deduplicated
}
struct MemInMsg {
    inp: Port,
    msg: Payload,
}
struct EndpointPoller {
    poll: Poll,
    events: Events,
    undrained_endpoints: HashSet<usize>,
    delayed_inp_messages: Vec<(Port, Msg)>,
}
struct Connector {
    logger: Box<dyn Logger>,
    proto_description: Arc<ProtocolD>,
    id_manager: IdManager,
    native_ports: HashSet<PortId>,
    proto_components: Vec<ProtoComponent>,
    outp_to_inp: HashMap<PortId, PortId>,
    inp_to_route: HashMap<PortId, InpRoute>,
    phased: ConnectorPhased,
}
enum ConnectorPhased {
    Setup {
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
        surplus_sockets: u16,
    },
    Communication {
        endpoint_poller: EndpointPoller,
        endpoint_exts: Vec<EndpointExt>,
        neighborhood: Neighborhood,
        mem_inbox: Vec<MemInMsg>,
    },
}
/////////////////////////////
impl IntStream {
    fn next(&mut self) -> u32 {
        if self.next == u32::MAX {
            panic!("NO NEXT!")
        }
        self.next += 1;
        self.next - 1
    }
}
impl IdManager {
    fn next_port(&mut self) -> PortId {
        let port_suffix = self.port_suffix_stream.next();
        let controller_id = self.controller_id;
        PortId { controller_id, port_index: port_suffix }
    }
    fn new(controller_id: ControllerId) -> Self {
        Self { controller_id, port_suffix_stream: Default::default() }
    }
}
impl Connector {
    pub fn new(
        logger: Box<dyn Logger>,
        proto_description: Arc<ProtocolD>,
        controller_id: ControllerId,
        surplus_sockets: u16,
    ) -> Self {
        Self {
            logger,
            proto_description,
            id_manager: IdManager::new(controller_id),
            native_ports: Default::default(),
            proto_components: Default::default(),
            outp_to_inp: Default::default(),
            inp_to_route: Default::default(),
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
        }
    }
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
        let o = self.id_manager.next_port();
        let i = self.id_manager.next_port();
        self.outp_to_inp.insert(o, i);
        self.native_ports.insert(o);
        self.native_ports.insert(i);
        [o, i]
    }
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
        match &mut self.phased {
            ConnectorPhased::Setup { endpoint_setups, .. } => {
                let p = self.id_manager.next_port();
                endpoint_setups.push((p, endpoint_setup));
                Ok(p)
            }
            ConnectorPhased::Communication { .. } => Err(()),
        }
    }
    fn check_polarity(&self, port: &PortId) -> Polarity {
        if self.outp_to_inp.contains_key(port) {
            Polarity::Putter
        } else {
            assert!(self.inp_to_route.contains_key(port));
            Polarity::Getter
        }
    }
    pub fn add_proto_component(&mut self, identifier: &[u8], ports: &[PortId]) -> Result<(), ()> {
        let polarities = self.proto_description.component_polarities(identifier).map_err(drop)?;
        if polarities.len() != ports.len() {
            return Err(());
        }
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
            if !self.native_ports.contains(port) {
                return Err(());
            }
            if expected_polarity != self.check_polarity(port) {
                return Err(());
            }
        }
        // ok!
        let state = self.proto_description.new_main_component(identifier, ports);
        let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
        let proto_component_index = self.proto_components.len();
        self.proto_components.push(proto_component);
        for port in ports.iter() {
            if let Polarity::Getter = self.check_polarity(port) {
                self.inp_to_route
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
            }
        }
        Ok(())
    }
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
        match &mut self.phased {
            ConnectorPhased::Communication { .. } => Err(()),
            ConnectorPhased::Setup { endpoint_setups, surplus_sockets } => {
                // connect all endpoints in parallel; send and receive peer ids through ports
                let (mut endpoint_exts, mut endpoint_poller) =
                    init_endpoints(endpoint_setups, timeout)?;
                write!(
                    self.logger.line_writer(),
                    "hello! I am controller_id:{}",
                    self.id_manager.controller_id
                );
                // leader election and tree construction
                let neighborhood = init_neighborhood(&mut endpoint_exts, &mut endpoint_poller)?;
                // TODO session optimization goes here
                self.phased = ConnectorPhased::Communication {
                    endpoint_poller,
                    endpoint_exts,
                    neighborhood,
                    mem_inbox: Default::default(),
                };
                Ok(())
            }
        }
    }
}

fn init_endpoints(
    endpoint_setups: &[(PortId, EndpointSetup)],
    timeout: Duration,
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
    let mut endpoint_poller = EndpointPoller {
        poll: Poll::new().map_err(drop)?,
        events: Events::with_capacity(64),
        undrained_endpoints: Default::default(),
        delayed_inp_messages: Default::default(),
    };
    const PORT_ID_LEN: usize = std::mem::size_of::<PortId>();
    enum MaybeRecvPort {
        Complete(Port),
        Partial { buf: [u8; PORT_ID_LEN], read: u8 },
    }
    struct Todo {
        endpoint: TodoEndpoint,
        polarity: Polarity,
        local_port: Port,
        sent_local_port: bool,
        recv_peer_port: MaybeRecvPort,
    }
    enum TodoEndpoint {
        Listener(mio::net::TcpListener),
        Stream(mio::net::TcpStream),
    }
    const BOTH: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
    fn init(
        token: Token,
        local_port: Port,
        endpoint_setup: &EndpointSetup,
        poll: &mut Poll,
    ) -> Result<Todo, ()> {
        let endpoint = if endpoint_setup.is_active {
            let mut stream =
                mio::net::TcpStream::connect(&endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut stream, token, BOTH).unwrap();
            TodoEndpoint::Stream(stream)
        } else {
            let mut listener =
                mio::net::TcpListener::bind(&endpoint_setup.sock_addr).map_err(drop)?;
            poll.registry().register(&mut listener, token, BOTH).unwrap();
            TodoEndpoint::Listener(listener)
        };
        Ok(Todo {
            endpoint,
            endpoint_setup: endpoint_setup.clone(),
            local_port,
            sent_local_port: false,
            recv_peer_port: MaybeRecvPort::Partial { buf: [0; 8], read: 0 },
        })
    };

    let todos = endpoint_setups
        .iter()
        .enumerate()
        .map(|(index, (local_port, endpoint_setup))| {
            init(Token(index), local_port, endpoint_setup, &mut endpoint_poller.poll)
        })
        .collect::<Result<Vec<Todo>, _>>()?;
    let endpoint_exts = vec![];
    Ok((endpoint_exts, endpoint_poller))
}

fn init_neighborhood(
    endpoint_exts: &mut [EndpointExt],
    endpoint_poller: &mut EndpointPoller,
) -> Result<Neighborhood, ()> {
    todo!()
}