diff --git a/src/runtime/v2.rs b/src/runtime/v2.rs new file mode 100644 index 0000000000000000000000000000000000000000..610036ae0e4477f6581fade79ea3e32775880d80 --- /dev/null +++ b/src/runtime/v2.rs @@ -0,0 +1,262 @@ +use crate::common::*; +use crate::runtime::endpoint::Endpoint; +use crate::runtime::endpoint::Msg; +use crate::runtime::ProtocolD; +use crate::runtime::ProtocolS; +use std::io::Write; + +#[derive(Default)] +struct IntStream { + next: u32, +} +struct IdManager { + controller_id: ControllerId, + port_suffix_stream: IntStream, +} + +struct ProtoComponent { + state: ProtocolS, + ports: HashSet, +} +enum InpRoute { + NativeComponent, + ProtoComponent { index: usize }, + Endpoint { index: usize }, +} +trait Logger { + fn line_writer(&mut self) -> &mut dyn Write; +} +#[derive(Clone)] +struct EndpointSetup { + polarity: Polarity, + sock_addr: SocketAddr, + is_active: bool, +} +struct EndpointExt { + net_endpoint: Endpoint, + // data-messages emerging from this endpoint are destined for this inp + inp: Port, +} +struct Neighborhood { + parent: Option, + children: Vec, // ordered, deduplicated +} +struct MemInMsg { + inp: Port, + msg: Payload, +} +struct EndpointPoller { + poll: Poll, + events: Events, + undrained_endpoints: HashSet, + delayed_inp_messages: Vec<(Port, Msg)>, +} +struct Connector { + logger: Box, + proto_description: Arc, + id_manager: IdManager, + native_ports: HashSet, + proto_components: Vec, + outp_to_inp: HashMap, + inp_to_route: HashMap, + phased: ConnectorPhased, +} +enum ConnectorPhased { + Setup { + endpoint_setups: Vec<(PortId, EndpointSetup)>, + surplus_sockets: u16, + }, + Communication { + endpoint_poller: EndpointPoller, + endpoint_exts: Vec, + neighborhood: Neighborhood, + mem_inbox: Vec, + }, +} +///////////////////////////// +impl IntStream { + fn next(&mut self) -> u32 { + if self.next == u32::MAX { + panic!("NO NEXT!") + } + self.next += 1; + self.next - 1 + } +} +impl IdManager { + fn next_port(&mut self) -> PortId { + let port_suffix = self.port_suffix_stream.next(); + let controller_id = self.controller_id; + PortId { controller_id, port_index: port_suffix } + } + fn new(controller_id: ControllerId) -> Self { + Self { controller_id, port_suffix_stream: Default::default() } + } +} +impl Connector { + pub fn new( + logger: Box, + proto_description: Arc, + controller_id: ControllerId, + surplus_sockets: u16, + ) -> Self { + Self { + logger, + proto_description, + id_manager: IdManager::new(controller_id), + native_ports: Default::default(), + proto_components: Default::default(), + outp_to_inp: Default::default(), + inp_to_route: Default::default(), + phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, + } + } + pub fn add_port_pair(&mut self) -> [PortId; 2] { + let o = self.id_manager.next_port(); + let i = self.id_manager.next_port(); + self.outp_to_inp.insert(o, i); + self.native_ports.insert(o); + self.native_ports.insert(i); + [o, i] + } + pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { + match &mut self.phased { + ConnectorPhased::Setup { endpoint_setups, .. } => { + let p = self.id_manager.next_port(); + endpoint_setups.push((p, endpoint_setup)); + Ok(p) + } + ConnectorPhased::Communication { .. } => Err(()), + } + } + fn check_polarity(&self, port: &PortId) -> Polarity { + if self.outp_to_inp.contains_key(port) { + Polarity::Putter + } else { + assert!(self.inp_to_route.contains_key(port)); + Polarity::Getter + } + } + pub fn add_proto_component(&mut self, identifier: &[u8], ports: &[PortId]) -> Result<(), ()> { + let polarities = self.proto_description.component_polarities(identifier).map_err(drop)?; + if polarities.len() != ports.len() { + return Err(()); + } + for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { + if !self.native_ports.contains(port) { + return Err(()); + } + if expected_polarity != self.check_polarity(port) { + return Err(()); + } + } + // ok! + let state = self.proto_description.new_main_component(identifier, ports); + let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; + let proto_component_index = self.proto_components.len(); + self.proto_components.push(proto_component); + for port in ports.iter() { + if let Polarity::Getter = self.check_polarity(port) { + self.inp_to_route + .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); + } + } + Ok(()) + } + pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { + match &mut self.phased { + ConnectorPhased::Communication { .. } => Err(()), + ConnectorPhased::Setup { endpoint_setups, surplus_sockets } => { + // connect all endpoints in parallel; send and receive peer ids through ports + let (mut endpoint_exts, mut endpoint_poller) = + init_endpoints(endpoint_setups, timeout)?; + write!( + self.logger.line_writer(), + "hello! I am controller_id:{}", + self.id_manager.controller_id + ); + // leader election and tree construction + let neighborhood = init_neighborhood(&mut endpoint_exts, &mut endpoint_poller)?; + // TODO session optimization goes here + self.phased = ConnectorPhased::Communication { + endpoint_poller, + endpoint_exts, + neighborhood, + mem_inbox: Default::default(), + }; + Ok(()) + } + } + } +} + +fn init_endpoints( + endpoint_setups: &[(PortId, EndpointSetup)], + timeout: Duration, +) -> Result<(Vec, EndpointPoller), ()> { + let mut endpoint_poller = EndpointPoller { + poll: Poll::new().map_err(drop)?, + events: Events::with_capacity(64), + undrained_endpoints: Default::default(), + delayed_inp_messages: Default::default(), + }; + const PORT_ID_LEN: usize = std::mem::size_of::(); + enum MaybeRecvPort { + Complete(Port), + Partial { buf: [u8; PORT_ID_LEN], read: u8 }, + } + struct Todo { + endpoint: TodoEndpoint, + polarity: Polarity, + local_port: Port, + sent_local_port: bool, + recv_peer_port: MaybeRecvPort, + } + enum TodoEndpoint { + Listener(mio::net::TcpListener), + Stream(mio::net::TcpStream), + } + const BOTH: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); + fn init( + token: Token, + local_port: Port, + endpoint_setup: &EndpointSetup, + poll: &mut Poll, + ) -> Result { + let endpoint = if endpoint_setup.is_active { + let mut stream = + mio::net::TcpStream::connect(&endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut stream, token, BOTH).unwrap(); + TodoEndpoint::Stream(stream) + } else { + let mut listener = + mio::net::TcpListener::bind(&endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut listener, token, BOTH).unwrap(); + TodoEndpoint::Listener(listener) + }; + Ok(Todo { + endpoint, + endpoint_setup: endpoint_setup.clone(), + local_port, + sent_local_port: false, + recv_peer_port: MaybeRecvPort::Partial { buf: [0; 8], read: 0 }, + }) + }; + + let todos = endpoint_setups + .iter() + .enumerate() + .map(|(index, (local_port, endpoint_setup))| { + init(Token(index), local_port, endpoint_setup, &mut endpoint_poller.poll) + }) + .collect::, _>>()?; + let endpoint_exts = vec![]; + Ok((endpoint_exts, endpoint_poller)) +} + +fn init_neighborhood( + endpoint_exts: &mut [EndpointExt], + endpoint_poller: &mut EndpointPoller, +) -> Result { + todo!() +}