diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 872fa77ef59d5ab5c5c4f3fc5a86d48d60e677a6..32ea1ac6d99fd1954189264e22daf4605ce0545c 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -27,15 +27,27 @@ impl Connector { } pub fn new_udp_port( &mut self, + polarity: Polarity, local_addr: SocketAddr, peer_addr: SocketAddr, - ) -> Result<[PortId; 2], WrongStateError> { - let Self { unphased: _up, phased } = self; + ) -> Result { + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), - ConnectorPhased::Setup(_setup) => { - let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; - todo!() + ConnectorPhased::Setup(setup) => { + let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; + let udp_index = setup.udp_endpoint_setups.len(); + let [port_nat, port_udp] = + [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; + cu.native_ports.insert(port_nat); + cu.port_info.peers.insert(port_nat, port_udp); + cu.port_info.peers.insert(port_udp, port_nat); + cu.port_info.routes.insert(port_nat, Route::LocalComponent(ComponentId::Native)); + cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index }); + cu.port_info.polarities.insert(port_nat, polarity); + cu.port_info.polarities.insert(port_udp, !polarity); + setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup)); + Ok(port_nat) } } } @@ -45,18 +57,18 @@ impl Connector { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, ) -> Result { - let Self { unphased: up, phased } = self; + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity }; - let p = up.id_manager.new_port_id(); - up.native_ports.insert(p); + let p = cu.id_manager.new_port_id(); + cu.native_ports.insert(p); // {polarity, route} known. {peer} unknown. - up.port_info.polarities.insert(p, polarity); - up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); + cu.port_info.polarities.insert(p, polarity); + cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); log!( - up.logger, + cu.logger, "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", p, polarity, @@ -83,19 +95,19 @@ impl Connector { &mut *cu.logger, &setup.net_endpoint_setups, &mut cu.port_info, - deadline, + &deadline, )?; log!( cu.logger, "Successfully connected {} endpoints", - endpoint_manager.net_endpoint_exts.len() + endpoint_manager.net_endpoint_store.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( cu.id_manager.connector_id, &mut *cu.logger, &mut endpoint_manager, - deadline, + &deadline, )?; log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { @@ -106,7 +118,7 @@ impl Connector { round_result: Ok(None), }; if cfg!(feature = "session_optimization") { - session_optimize(cu, &mut comm, deadline)?; + session_optimize(cu, &mut comm, &deadline)?; } log!(cu.logger, "connect() finished. setup phase complete"); self.phased = ConnectorPhased::Communication(Box::new(comm)); @@ -119,7 +131,7 @@ fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, NetEndpointSetup)], port_info: &mut PortInfo, - deadline: Option, + deadline: &Option, ) -> Result { //////////////////////////////////////////// use std::sync::atomic::AtomicBool; @@ -173,7 +185,8 @@ fn new_endpoint_manager( let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); - let mut polled_undrained = VecSet::default(); + let mut net_polled_undrained = VecSet::default(); + let udp_polled_undrained = VecSet::default(); let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively @@ -334,7 +347,7 @@ fn new_endpoint_manager( net_endpoint .send(&msg) .map_err(|e| { - Ce::EndpointSetupError( + Ce::NetEndpointSetupError( net_endpoint.stream.local_addr().unwrap(), e, ) @@ -346,10 +359,13 @@ fn new_endpoint_manager( if event.is_readable() && todo.recv_peer_port.is_none() { // can read and didn't recv setup msg yet? Do so! let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { - Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e) + Ce::NetEndpointSetupError( + net_endpoint.stream.local_addr().unwrap(), + e, + ) })?; if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() { - polled_undrained.insert(index); + net_polled_undrained.insert(index); } match maybe_msg { None => {} // msg deserialization incomplete @@ -407,7 +423,14 @@ fn new_endpoint_manager( } events.clear(); } - // all todos must be the NetEndpoint variants! unwrap and collect them + log!(logger, "Endpoint setup complete! Cleaning up and building structures"); + if let Some(arc) = waker_state { + log!(logger, "Sending waker the stop signal"); + arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); + // TODO leave the waker registered? + } + let udp_endpoint_exts = vec![]; + let net_endpoint_exts = todos .into_iter() .enumerate() @@ -425,18 +448,20 @@ fn new_endpoint_manager( getter_for_incoming: local_port, }) .collect(); - if let Some(arc) = waker_state { - log!(logger, "Sending waker the stop signal"); - arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); - // TODO leave the waker registered? - } Ok(EndpointManager { poll, events, - polled_undrained, undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - net_endpoint_exts, + net_endpoint_store: EndpointStore { + endpoint_exts: net_endpoint_exts, + polled_undrained: net_polled_undrained, + }, + udp_endpoint_store: EndpointStore { + endpoint_exts: udp_endpoint_exts, + polled_undrained: udp_polled_undrained, + }, + udp_in_buffer: Default::default(), }) } @@ -444,7 +469,7 @@ fn init_neighborhood( connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, - deadline: Option, + deadline: &Option, ) -> Result { //////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; @@ -645,7 +670,7 @@ fn init_neighborhood( fn session_optimize( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, - deadline: Option, + deadline: &Option, ) -> Result<(), ConnectError> { //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; @@ -709,7 +734,8 @@ fn session_optimize( serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm .endpoint_manager - .net_endpoint_exts + .net_endpoint_store + .endpoint_exts .iter() .map(|ee| ee.getter_for_incoming) .collect(), @@ -798,8 +824,12 @@ fn apply_optimizations( cu.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; - for (ee, getter) in - comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + for (ee, getter) in comm + .endpoint_manager + .net_endpoint_store + .endpoint_exts + .iter_mut() + .zip(endpoint_incoming_to_getter) { ee.getter_for_incoming = getter; }