diff --git a/src/runtime/v2.rs b/src/runtime/v2.rs index 610036ae0e4477f6581fade79ea3e32775880d80..abd043454dd94e5c1060e66ecada817add63cf96 100644 --- a/src/runtime/v2.rs +++ b/src/runtime/v2.rs @@ -1,262 +1,8 @@ -use crate::common::*; -use crate::runtime::endpoint::Endpoint; -use crate::runtime::endpoint::Msg; -use crate::runtime::ProtocolD; -use crate::runtime::ProtocolS; -use std::io::Write; +use crate::common::{ + Arc, ControllerId, Duration, HashMap, HashSet, Instant, Payload, Polarity, Port, PortId, + ProtocolDescription, SocketAddr, +}; +use crate::runtime::endpoint::{Endpoint, Msg}; +use crate::runtime::*; -#[derive(Default)] -struct IntStream { - next: u32, -} -struct IdManager { - controller_id: ControllerId, - port_suffix_stream: IntStream, -} - -struct ProtoComponent { - state: ProtocolS, - ports: HashSet, -} -enum InpRoute { - NativeComponent, - ProtoComponent { index: usize }, - Endpoint { index: usize }, -} -trait Logger { - fn line_writer(&mut self) -> &mut dyn Write; -} -#[derive(Clone)] -struct EndpointSetup { - polarity: Polarity, - sock_addr: SocketAddr, - is_active: bool, -} -struct EndpointExt { - net_endpoint: Endpoint, - // data-messages emerging from this endpoint are destined for this inp - inp: Port, -} -struct Neighborhood { - parent: Option, - children: Vec, // ordered, deduplicated -} -struct MemInMsg { - inp: Port, - msg: Payload, -} -struct EndpointPoller { - poll: Poll, - events: Events, - undrained_endpoints: HashSet, - delayed_inp_messages: Vec<(Port, Msg)>, -} -struct Connector { - logger: Box, - proto_description: Arc, - id_manager: IdManager, - native_ports: HashSet, - proto_components: Vec, - outp_to_inp: HashMap, - inp_to_route: HashMap, - phased: ConnectorPhased, -} -enum ConnectorPhased { - Setup { - endpoint_setups: Vec<(PortId, EndpointSetup)>, - surplus_sockets: u16, - }, - Communication { - endpoint_poller: EndpointPoller, - endpoint_exts: Vec, - neighborhood: Neighborhood, - mem_inbox: Vec, - }, -} ///////////////////////////// -impl IntStream { - fn next(&mut self) -> u32 { - if self.next == u32::MAX { - panic!("NO NEXT!") - } - self.next += 1; - self.next - 1 - } -} -impl IdManager { - fn next_port(&mut self) -> PortId { - let port_suffix = self.port_suffix_stream.next(); - let controller_id = self.controller_id; - PortId { controller_id, port_index: port_suffix } - } - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, port_suffix_stream: Default::default() } - } -} -impl Connector { - pub fn new( - logger: Box, - proto_description: Arc, - controller_id: ControllerId, - surplus_sockets: u16, - ) -> Self { - Self { - logger, - proto_description, - id_manager: IdManager::new(controller_id), - native_ports: Default::default(), - proto_components: Default::default(), - outp_to_inp: Default::default(), - inp_to_route: Default::default(), - phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, - } - } - pub fn add_port_pair(&mut self) -> [PortId; 2] { - let o = self.id_manager.next_port(); - let i = self.id_manager.next_port(); - self.outp_to_inp.insert(o, i); - self.native_ports.insert(o); - self.native_ports.insert(i); - [o, i] - } - pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { - match &mut self.phased { - ConnectorPhased::Setup { endpoint_setups, .. } => { - let p = self.id_manager.next_port(); - endpoint_setups.push((p, endpoint_setup)); - Ok(p) - } - ConnectorPhased::Communication { .. } => Err(()), - } - } - fn check_polarity(&self, port: &PortId) -> Polarity { - if self.outp_to_inp.contains_key(port) { - Polarity::Putter - } else { - assert!(self.inp_to_route.contains_key(port)); - Polarity::Getter - } - } - pub fn add_proto_component(&mut self, identifier: &[u8], ports: &[PortId]) -> Result<(), ()> { - let polarities = self.proto_description.component_polarities(identifier).map_err(drop)?; - if polarities.len() != ports.len() { - return Err(()); - } - for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !self.native_ports.contains(port) { - return Err(()); - } - if expected_polarity != self.check_polarity(port) { - return Err(()); - } - } - // ok! - let state = self.proto_description.new_main_component(identifier, ports); - let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - let proto_component_index = self.proto_components.len(); - self.proto_components.push(proto_component); - for port in ports.iter() { - if let Polarity::Getter = self.check_polarity(port) { - self.inp_to_route - .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); - } - } - Ok(()) - } - pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { - match &mut self.phased { - ConnectorPhased::Communication { .. } => Err(()), - ConnectorPhased::Setup { endpoint_setups, surplus_sockets } => { - // connect all endpoints in parallel; send and receive peer ids through ports - let (mut endpoint_exts, mut endpoint_poller) = - init_endpoints(endpoint_setups, timeout)?; - write!( - self.logger.line_writer(), - "hello! I am controller_id:{}", - self.id_manager.controller_id - ); - // leader election and tree construction - let neighborhood = init_neighborhood(&mut endpoint_exts, &mut endpoint_poller)?; - // TODO session optimization goes here - self.phased = ConnectorPhased::Communication { - endpoint_poller, - endpoint_exts, - neighborhood, - mem_inbox: Default::default(), - }; - Ok(()) - } - } - } -} - -fn init_endpoints( - endpoint_setups: &[(PortId, EndpointSetup)], - timeout: Duration, -) -> Result<(Vec, EndpointPoller), ()> { - let mut endpoint_poller = EndpointPoller { - poll: Poll::new().map_err(drop)?, - events: Events::with_capacity(64), - undrained_endpoints: Default::default(), - delayed_inp_messages: Default::default(), - }; - const PORT_ID_LEN: usize = std::mem::size_of::(); - enum MaybeRecvPort { - Complete(Port), - Partial { buf: [u8; PORT_ID_LEN], read: u8 }, - } - struct Todo { - endpoint: TodoEndpoint, - polarity: Polarity, - local_port: Port, - sent_local_port: bool, - recv_peer_port: MaybeRecvPort, - } - enum TodoEndpoint { - Listener(mio::net::TcpListener), - Stream(mio::net::TcpStream), - } - const BOTH: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - fn init( - token: Token, - local_port: Port, - endpoint_setup: &EndpointSetup, - poll: &mut Poll, - ) -> Result { - let endpoint = if endpoint_setup.is_active { - let mut stream = - mio::net::TcpStream::connect(&endpoint_setup.sock_addr).map_err(drop)?; - poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::Stream(stream) - } else { - let mut listener = - mio::net::TcpListener::bind(&endpoint_setup.sock_addr).map_err(drop)?; - poll.registry().register(&mut listener, token, BOTH).unwrap(); - TodoEndpoint::Listener(listener) - }; - Ok(Todo { - endpoint, - endpoint_setup: endpoint_setup.clone(), - local_port, - sent_local_port: false, - recv_peer_port: MaybeRecvPort::Partial { buf: [0; 8], read: 0 }, - }) - }; - - let todos = endpoint_setups - .iter() - .enumerate() - .map(|(index, (local_port, endpoint_setup))| { - init(Token(index), local_port, endpoint_setup, &mut endpoint_poller.poll) - }) - .collect::, _>>()?; - let endpoint_exts = vec![]; - Ok((endpoint_exts, endpoint_poller)) -} - -fn init_neighborhood( - endpoint_exts: &mut [EndpointExt], - endpoint_poller: &mut EndpointPoller, -) -> Result { - todo!() -}