diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 35c2f1f5ace3c6f4119793f9400dc2621f9fc173..95db7fb554a1569e2b5077733c8639824d15cd06 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -13,10 +13,12 @@ impl Connector { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), - logger, - id_manager: IdManager::new(connector_id), - native_ports: Default::default(), - port_info: Default::default(), + inner: ConnectorUnphasedInner { + logger, + id_manager: IdManager::new(connector_id), + native_ports: Default::default(), + port_info: Default::default(), + }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { net_endpoint_setups: Default::default(), @@ -35,22 +37,22 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let mut npid = || cu.id_manager.new_port_id(); + let mut npid = || cu.inner.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.native_ports.insert(nin); - cu.native_ports.insert(nout); - cu.port_info.polarities.insert(nin, Getter); - cu.port_info.polarities.insert(nout, Putter); - cu.port_info.polarities.insert(uin, Getter); - cu.port_info.polarities.insert(uout, Putter); - cu.port_info.peers.insert(nin, uout); - cu.port_info.peers.insert(nout, uin); - cu.port_info.peers.insert(uin, nout); - cu.port_info.peers.insert(uout, nin); - cu.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native)); - cu.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native)); - cu.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); - cu.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); + cu.inner.native_ports.insert(nin); + cu.inner.native_ports.insert(nout); + cu.inner.port_info.polarities.insert(nin, Getter); + cu.inner.port_info.polarities.insert(nout, Putter); + cu.inner.port_info.polarities.insert(uin, Getter); + cu.inner.port_info.polarities.insert(uout, Putter); + cu.inner.port_info.peers.insert(nin, uout); + cu.inner.port_info.peers.insert(nout, uin); + cu.inner.port_info.peers.insert(uin, nout); + cu.inner.port_info.peers.insert(uout, nin); + cu.inner.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); + cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, @@ -70,13 +72,16 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let local_port = cu.id_manager.new_port_id(); - cu.native_ports.insert(local_port); + let local_port = cu.inner.id_manager.new_port_id(); + cu.inner.native_ports.insert(local_port); // {polarity, route} known. {peer} unknown. - cu.port_info.polarities.insert(local_port, polarity); - cu.port_info.routes.insert(local_port, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.polarities.insert(local_port, polarity); + cu.inner + .port_info + .routes + .insert(local_port, Route::LocalComponent(ComponentId::Native)); log!( - cu.logger, + cu.inner.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", local_port, polarity, @@ -97,33 +102,33 @@ impl Connector { let Self { unphased: cu, phased } = self; match &phased { ConnectorPhased::Communication { .. } => { - log!(cu.logger, "Call to connecting in connected state"); + log!(cu.inner.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *cu.logger, + &mut *cu.inner.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.port_info, + &mut cu.inner.port_info, &deadline, )?; log!( - cu.logger, + cu.inner.logger, "Successfully connected {} endpoints", endpoint_manager.net_endpoint_store.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.id_manager.connector_id, - &mut *cu.logger, + cu.inner.id_manager.connector_id, + &mut *cu.inner.logger, &mut endpoint_manager, &deadline, )?; - log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, @@ -134,7 +139,7 @@ impl Connector { if cfg!(feature = "session_optimization") { session_optimize(cu, &mut comm, &deadline)?; } - log!(cu.logger, "connect() finished. setup phase complete"); + log!(cu.inner.logger, "connect() finished. setup phase complete"); self.phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } @@ -736,25 +741,25 @@ fn session_optimize( //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// - log!(cu.logger, "Beginning session optimization"); + log!(cu.inner.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); comm.endpoint_manager.undelay_all(); while !awaiting.is_empty() { log!( - cu.logger, + cu.inner.logger, "Session gather loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; + log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( - cu.logger, + cu.inner.logger, "Wasn't expecting session info from {:?}. Got {:?}", recv_index, &child_unoptimized_map @@ -767,11 +772,11 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); + log!(cu.inner.logger, "discarding old message {:?} during election", msg); } msg @ S(Sm::SessionScatter { .. }) => { log!( - cu.logger, + cu.inner.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg @@ -779,18 +784,18 @@ fn session_optimize( return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { - log!(cu.logger, "delaying msg {:?} during session optimization", msg); + log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } } } log!( - cu.logger, + cu.inner.logger, "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.port_info.clone(), + port_info: cu.inner.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -801,34 +806,38 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.id_manager.connector_id, my_session_info); - log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); + unoptimized_map.insert(cu.inner.id_manager.connector_id, my_session_info); + log!( + cu.inner.logger, + "Inserting my own info. Unoptimized subtree map is {:?}", + &unoptimized_map + ); // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent - log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); + log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent); let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( - cu.logger, + cu.inner.logger, "Session scatter recv loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; + log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { - log!(cu.logger, "I expected the scatter from my parent only!"); + log!(cu.inner.logger, "I expected the scatter from my parent only!"); return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } msg @ Msg::CommMsg { .. } => { - log!(cu.logger, "delaying msg {:?} during scatter recv", msg); + log!(cu.inner.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } msg @ S(Sm::SessionGather { .. }) @@ -836,30 +845,30 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); + log!(cu.inner.logger, "discarding old message {:?} during election", msg); } } } } else { // by computing it myself - log!(cu.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? + log!(cu.inner.logger, "I am the leader! I will optimize this session"); + leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)? }; log!( - cu.logger, + cu.inner.logger, "Optimized info map is {:?}. Sending to children {:?}", &optimized_map, comm.neighborhood.children.iter() ); - log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); + log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = - optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + optimized_map.get(&cu.inner.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } apply_optimizations(cu, comm, optimized_info)?; - log!(cu.logger, "Session optimizations applied"); + log!(cu.inner.logger, "Session optimizations applied"); Ok(()) } fn leader_session_map_optimize( @@ -882,7 +891,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.port_info = port_info; + cu.inner.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm