diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index d52109fd9a713dbce41509d0bdc965f08804bce5..abbbbabd1cd3d31924605d421103113d3f6449e0 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -20,12 +20,14 @@ impl Connector { ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); Self { - proto_description, - proto_components: Default::default(), - logger, - id_manager: IdManager::new(connector_id), - native_ports: Default::default(), - port_info: Default::default(), + unphased: ConnectorUnphased { + proto_description, + proto_components: Default::default(), + logger, + id_manager: IdManager::new(connector_id), + native_ports: Default::default(), + port_info: Default::default(), + }, phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } @@ -35,16 +37,17 @@ impl Connector { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, ) -> Result { - match &mut self.phased { + let Self { unphased: up, phased } = self; + match phased { ConnectorPhased::Setup { endpoint_setups, .. } => { let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity }; - let p = self.id_manager.new_port_id(); - self.native_ports.insert(p); + let p = up.id_manager.new_port_id(); + up.native_ports.insert(p); // {polarity, route} known. {peer} unknown. - self.port_info.polarities.insert(p, polarity); - self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); + up.port_info.polarities.insert(p, polarity); + up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); log!( - self.logger, + up.logger, "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", p, polarity, @@ -58,44 +61,45 @@ impl Connector { } pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError::*; - match &mut self.phased { + let Self { unphased: up, phased } = self; + match phased { ConnectorPhased::Communication { .. } => { - log!(self.logger, "Call to connecting in connected state"); + log!(up.logger, "Call to connecting in connected state"); Err(AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { - log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(up.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *self.logger, + &mut *up.logger, endpoint_setups, - &mut self.port_info, + &mut up.port_info, deadline, )?; log!( - self.logger, + up.logger, "Successfully connected {} endpoints", endpoint_manager.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( - self.id_manager.connector_id, - &mut *self.logger, + up.id_manager.connector_id, + &mut *up.logger, &mut endpoint_manager, deadline, )?; - log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); - log!(self.logger, "connect() finished. setup phase complete"); + log!(up.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(up.logger, "connect() finished. setup phase complete"); // TODO session optimization goes here - self.phased = ConnectorPhased::Communication { + self.phased = ConnectorPhased::Communication(ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, mem_inbox: Default::default(), native_batches: vec![Default::default()], round_result: Ok(None), - }; + }); Ok(()) } } @@ -144,8 +148,8 @@ fn new_endpoint_manager( // 1. Start to construct EndpointManager let mut poll = Poll::new().map_err(|_| PollInitFailed)?; - let mut events = Events::with_capacity(64); - let mut polled_undrained = IndexSet::::default(); + let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); + let mut polled_undrained = IndexSet::default(); let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively