diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ed9962ed069473e98fe41613a225d9da56c1b91b..c87129d90ebd6abd5ff12099db7087bad6580003 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -9,16 +9,15 @@ impl Connector { ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); let mut id_manager = IdManager::new(connector_id); + let native_component_id = id_manager.new_component_id(); Self { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), inner: ConnectorUnphasedInner { logger, - native_component_id: id_manager.new_component_id(), - id_manager, - native_ports: Default::default(), - port_info: Default::default(), + native_component_id, + current_state: CurrentState { id_manager, port_info: Default::default() }, }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { @@ -42,28 +41,46 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let mut npid = || cu.inner.id_manager.new_port_id(); + let udp_cid = cu.inner.current_state.id_manager.new_component_id(); + let mut npid = || cu.inner.current_state.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.inner.native_ports.insert(nin); - cu.inner.native_ports.insert(nout); - cu.inner.port_info.polarities.insert(nin, Getter); - cu.inner.port_info.polarities.insert(nout, Putter); - cu.inner.port_info.polarities.insert(uin, Getter); - cu.inner.port_info.polarities.insert(uout, Putter); - cu.inner.port_info.peers.insert(nin, uout); - cu.inner.port_info.peers.insert(nout, uin); - cu.inner.port_info.peers.insert(uin, nout); - cu.inner.port_info.peers.insert(uout, nin); - cu.inner - .port_info - .routes - .insert(nin, Route::LocalComponent(cu.inner.native_component_id)); - cu.inner - .port_info - .routes - .insert(nout, Route::LocalComponent(cu.inner.native_component_id)); - cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); - cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); + + cu.inner.current_state.port_info.insert( + nin, + PortInfo { + route: Route::LocalComponent, + polarity: Getter, + peer: Some(uout), + owner: cu.inner.native_component_id, + }, + ); + cu.inner.current_state.port_info.insert( + nout, + PortInfo { + route: Route::LocalComponent, + polarity: Putter, + peer: Some(uin), + owner: cu.inner.native_component_id, + }, + ); + cu.inner.current_state.port_info.insert( + uin, + PortInfo { + route: Route::UdpEndpoint { index: udp_index }, + polarity: Getter, + peer: Some(uin), + owner: udp_cid, + }, + ); + cu.inner.current_state.port_info.insert( + uout, + PortInfo { + route: Route::UdpEndpoint { index: udp_index }, + polarity: Putter, + peer: Some(uin), + owner: udp_cid, + }, + ); setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, @@ -83,18 +100,20 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let local_port = cu.inner.id_manager.new_port_id(); - cu.inner.native_ports.insert(local_port); - // {polarity, route} known. {peer} unknown. - cu.inner.port_info.polarities.insert(local_port, polarity); - cu.inner - .port_info - .routes - .insert(local_port, Route::LocalComponent(cu.inner.native_component_id)); + let new_pid = cu.inner.current_state.id_manager.new_port_id(); + cu.inner.current_state.port_info.insert( + new_pid, + PortInfo { + route: Route::LocalComponent, + peer: None, + owner: cu.inner.native_component_id, + polarity, + }, + ); log!( cu.inner.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", - local_port, + new_pid, polarity, &sock_addr, endpoint_polarity @@ -102,9 +121,9 @@ impl Connector { setup.net_endpoint_setups.push(NetEndpointSetup { sock_addr, endpoint_polarity, - getter_for_incoming: local_port, + getter_for_incoming: new_pid, }); - Ok(local_port) + Ok(new_pid) } } } @@ -124,17 +143,19 @@ impl Connector { &mut *cu.inner.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.inner.port_info, + &mut cu.inner.current_state.port_info, &deadline, )?; log!( cu.inner.logger, - "Successfully connected {} endpoints", - endpoint_manager.net_endpoint_store.endpoint_exts.len() + "Successfully connected {} endpoints. info now {:#?} {:#?}", + endpoint_manager.net_endpoint_store.endpoint_exts.len(), + &cu.inner.current_state.port_info, + &endpoint_manager, ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.inner.id_manager.connector_id, + cu.inner.current_state.id_manager.connector_id, &mut *cu.inner.logger, &mut endpoint_manager, &deadline, @@ -151,7 +172,7 @@ impl Connector { session_optimize(cu, &mut comm, &deadline)?; } log!(cu.inner.logger, "connect() finished. setup phase complete"); - self.phased = ConnectorPhased::Communication(Box::new(comm)); + *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } } @@ -161,7 +182,7 @@ fn new_endpoint_manager( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &mut PortInfo, + port_info: &mut HashMap, deadline: &Option, ) -> Result { //////////////////////////////////////////// @@ -393,14 +414,14 @@ fn new_endpoint_manager( } continue; } - let local_polarity = *port_info - .polarities - .get(&net_todo.endpoint_setup.getter_for_incoming) + let local_info = port_info + .get_mut(&net_todo.endpoint_setup.getter_for_incoming) .unwrap(); if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { - polarity: local_polarity, + owner: local_info.owner, + polarity: local_info.polarity, port: net_todo.endpoint_setup.getter_for_incoming, })); net_endpoint @@ -435,36 +456,21 @@ fn new_endpoint_manager( index, peer_info ); - if peer_info.polarity == local_polarity { + if peer_info.polarity == local_info.polarity { return Err(ConnectError::PortPeerPolarityMismatch( net_todo.endpoint_setup.getter_for_incoming, )); } net_todo.recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! - port_info.peers.insert( - net_todo.endpoint_setup.getter_for_incoming, - peer_info.port, - ); + local_info.peer = Some(peer_info.port); // 2. learned the info of this peer port - port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert( - peer_info.port, - net_todo.endpoint_setup.getter_for_incoming, - ); - if let Some(route) = port_info.routes.get(&peer_info.port) { - // check just for logging purposes - log!( - logger, - "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", - peer_info.port, - route - ); - } - port_info - .routes - .entry(peer_info.port) - .or_insert(Route::NetEndpoint { index }); + port_info.entry(peer_info.port).or_insert(PortInfo { + peer: Some(net_todo.endpoint_setup.getter_for_incoming), + polarity: peer_info.polarity, + owner: peer_info.owner, + route: Route::NetEndpoint { index }, + }); } Some(inappropriate_msg) => { log!( @@ -805,7 +811,7 @@ fn session_optimize( unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.inner.port_info.clone(), + port_info: cu.inner.current_state.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -816,7 +822,7 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.inner.id_manager.connector_id, my_session_info); + unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info); log!( cu.inner.logger, "Inserting my own info. Unoptimized subtree map is {:?}", @@ -871,8 +877,10 @@ fn session_optimize( comm.neighborhood.children.iter() ); log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); - let optimized_info = - optimized_map.get(&cu.inner.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + let optimized_info = optimized_map + .get(&cu.inner.current_state.id_manager.connector_id) + .expect("HEY NO INFO FOR ME?") + .clone(); let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; @@ -901,7 +909,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.inner.port_info = port_info; + cu.inner.current_state.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm