diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index a86688f7e5cf6daea809e23b33c9f67f6fa35756..f53c2e9fbd611e5a44febaa3aadbee33f159260b 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,12 +1,6 @@ use crate::common::*; use crate::runtime::*; -#[derive(Default)] -struct ExtraPortInfo { - info: HashMap, - peers: HashMap, -} - impl TokenTarget { // subdivides the domain of usize into // [NET_ENDPOINT][UDP_ENDPOINT ] @@ -208,61 +202,92 @@ impl Connector { Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); - let deadline = timeout.map(|to| Instant::now() + to); - // connect all endpoints in parallel; send and receive peer ids through ports - let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports( - &mut *cu.logger, - &setup.net_endpoint_setups, - &setup.udp_endpoint_setups, - &cu.ips.port_info, - &deadline, - )?; - log!( - cu.logger, - "Successfully connected {} endpoints. info now {:#?} {:#?}", - endpoint_manager.net_endpoint_store.endpoint_exts.len(), - &cu.ips.port_info, - &endpoint_manager, - ); - // leader election and tree construction. Learn our role in the consensus tree, - // from learning who are our children/parents (neighbors) in the consensus tree. - let neighborhood = init_neighborhood( - cu.ips.id_manager.connector_id, - &mut *cu.logger, - &mut endpoint_manager, - &deadline, - )?; - log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); - // Put it all together with an initial round index of zero. - let mut comm = ConnectorCommunication { - round_index: 0, - endpoint_manager, - neighborhood, - native_batches: vec![Default::default()], - round_result: Ok(None), // no previous round yet + // Ideally, we'd simply clone `cu` in its entirety, and pass it to + // `connect_inner`, such that a successful connection discards the original. + // We need to work around the `logger` not being clonable; + // solution? create a dummy clone, and use mem-swap to ensure the + // single real logger is wherever it needs to be. + let mut cu_clone = ConnectorUnphased { + logger: Box::new(DummyLogger), + proto_components: cu.proto_components.clone(), + native_component_id: cu.native_component_id.clone(), + ips: cu.ips.clone(), + proto_description: cu.proto_description.clone(), }; - if cfg!(feature = "session_optimization") { - // Perform the session optimization procedure, which may modify the - // internals of the connector, rerouting ports, moving around connectors etc. - session_optimize(cu, &mut comm, &deadline)?; - } - log!(cu.logger, "connect() finished. setup phase complete"); - // Connect procedure successful! Commit changes by... - // ... commiting new port info for ConnectorUnphased - for (port, info) in extra_port_info.info.drain() { - cu.ips.port_info.owned.entry(info.owner).or_default().insert(port); - cu.ips.port_info.map.insert(port, info); - } - for (port, peer) in extra_port_info.peers.drain() { - cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer); + // cu has REAL logger... + std::mem::swap(&mut cu.logger, &mut cu_clone.logger); + // ... cu_clone has REAL logger. + match Self::connect_inner(cu_clone, setup, timeout) { + Ok(connected_connector) => { + *self = connected_connector; + Ok(()) + } + Err((err, mut logger)) => { + // Put the original logger back in place (in self.unphased, AKA `cu`). + // cu_clone has REAL logger... + std::mem::swap(&mut cu.logger, &mut logger); + // ... cu has REAL logger. + Err(err) + } } - // ... replacing the connector's phase to "communication" - *phased = ConnectorPhased::Communication(Box::new(comm)); - Ok(()) } } } + fn connect_inner( + mut cu: ConnectorUnphased, + setup: &ConnectorSetup, + timeout: Option, + ) -> Result)> { + log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); + let deadline = timeout.map(|to| Instant::now() + to); + let mut try_complete = || { + // connect all endpoints in parallel; send and receive peer ids through ports + let mut endpoint_manager = setup_endpoints_and_pair_ports( + &mut *cu.logger, + &setup.net_endpoint_setups, + &setup.udp_endpoint_setups, + &mut cu.ips.port_info, + &deadline, + )?; + log!( + cu.logger, + "Successfully connected {} endpoints. info now {:#?} {:#?}", + endpoint_manager.net_endpoint_store.endpoint_exts.len(), + &cu.ips.port_info, + &endpoint_manager, + ); + // leader election and tree construction. Learn our role in the consensus tree, + // from learning who are our children/parents (neighbors) in the consensus tree. + let neighborhood = init_neighborhood( + cu.ips.id_manager.connector_id, + &mut *cu.logger, + &mut endpoint_manager, + &deadline, + )?; + log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + // Put it all together with an initial round index of zero. + let mut comm = ConnectorCommunication { + round_index: 0, + endpoint_manager, + neighborhood, + native_batches: vec![Default::default()], + round_result: Ok(None), // no previous round yet + }; + if cfg!(feature = "session_optimization") { + // Perform the session optimization procedure, which may modify the + // internals of the connector, rerouting ports, moving around connectors etc. + session_optimize(&mut cu, &mut comm, &deadline)?; + } + log!(cu.logger, "connect() finished. setup phase complete"); + Ok(comm) + }; + match try_complete() { + Ok(comm) => { + Ok(Self { unphased: cu, phased: ConnectorPhased::Communication(Box::new(comm)) }) + } + Err(err) => Err((err, cu.logger)), + } + } } // Given a set of net_ and udp_ endpoints to setup, @@ -275,9 +300,9 @@ fn setup_endpoints_and_pair_ports( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &PortInfoMap, + port_info: &mut PortInfoMap, deadline: &Option, -) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> { +) -> Result { use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); const RETRY_PERIOD: Duration = Duration::from_millis(200); @@ -305,12 +330,9 @@ fn setup_endpoints_and_pair_ports( Accepting(TcpListener), // awaiting it's peer initiating the connection PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel } - //////////////////////////////////////////// // Start to construct our return values - // let mut waker_state: Option> = None; - let mut extra_port_info = ExtraPortInfo::default(); let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4); @@ -540,20 +562,25 @@ fn setup_endpoints_and_pair_ports( } net_todo.recv_peer_port = Some(peer_info.port); // finally learned the peer of this port! - extra_port_info.peers.insert( - net_todo.endpoint_setup.getter_for_incoming, - peer_info.port, - ); + port_info + .map + .get_mut(&net_todo.endpoint_setup.getter_for_incoming) + .unwrap() + .peer = Some(peer_info.port); // learned the info of this peer port - if !port_info.map.contains_key(&peer_info.port) { - let info = PortInfo { + port_info.map.entry(peer_info.port).or_insert({ + port_info + .owned + .entry(peer_info.owner) + .or_default() + .insert(peer_info.port); + PortInfo { peer: Some(net_todo.endpoint_setup.getter_for_incoming), polarity: peer_info.polarity, owner: peer_info.owner, route: Route::NetEndpoint { index }, - }; - extra_port_info.info.insert(peer_info.port, info); - } + } + }); } Some(inappropriate_msg) => { log!( @@ -625,7 +652,7 @@ fn setup_endpoints_and_pair_ports( }, io_byte_buffer, }; - Ok((endpoint_manager, extra_port_info)) + Ok(endpoint_manager) } // Given a fully-formed endpoint manager, @@ -1015,6 +1042,7 @@ fn apply_my_optimizations( endpoint_incoming_to_getter, } = session_info; // simply overwrite the contents + // println!("BEFORE: {:#?}\n{:#?}", cu, comm); cu.ips.port_info = port_info; assert!(cu.ips.port_info.invariant_preserved()); cu.proto_components = proto_components; @@ -1028,5 +1056,6 @@ fn apply_my_optimizations( { ee.getter_for_incoming = getter; } + // println!("AFTER: {:#?}\n{:#?}", cu, comm); Ok(()) }