diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 27be90d80ca6bcc5d220bad3cf0b79947c432449..1f77f0121fde90a0d50510dc6a31881ab46986b5 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,6 +1,31 @@ use crate::common::*; use crate::runtime::*; +impl TokenTarget { + const HALFWAY_INDEX: usize = usize::MAX / 2; + const MAX_INDEX: usize = usize::MAX; + const WAKER_TOKEN: usize = Self::MAX_INDEX; +} +impl From for TokenTarget { + fn from(Token(index): Token) -> Self { + if index == Self::WAKER_TOKEN { + TokenTarget::Waker + } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + TokenTarget::UdpEndpoint { index: shifted } + } else { + TokenTarget::NetEndpoint { index } + } + } +} +impl Into for TokenTarget { + fn into(self) -> Token { + match self { + TokenTarget::Waker => Token(Self::WAKER_TOKEN), + TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), + TokenTarget::NetEndpoint { index } => Token(index), + } + } +} impl Connector { /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing). /// The resulting connector will start in the setup phase, and cannot be used for communication until the @@ -25,11 +50,9 @@ impl Connector { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), - inner: ConnectorUnphasedInner { - logger, - native_component_id, - current_state: CurrentState { id_manager, port_info: Default::default() }, - }, + logger, + native_component_id, + current_state: CurrentState { id_manager, port_info: Default::default() }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { net_endpoint_setups: Default::default(), @@ -52,29 +75,29 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let udp_cid = cu.inner.current_state.id_manager.new_component_id(); - let mut npid = || cu.inner.current_state.id_manager.new_port_id(); + let udp_cid = cu.current_state.id_manager.new_component_id(); + let mut npid = || cu.current_state.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( nin, PortInfo { route: Route::LocalComponent, polarity: Getter, peer: Some(uout), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( nout, PortInfo { route: Route::LocalComponent, polarity: Putter, peer: Some(uin), - owner: cu.inner.native_component_id, + owner: cu.native_component_id, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( uin, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -83,7 +106,7 @@ impl Connector { owner: udp_cid, }, ); - cu.inner.current_state.port_info.insert( + cu.current_state.port_info.insert( uout, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -115,18 +138,18 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let new_pid = cu.inner.current_state.id_manager.new_port_id(); - cu.inner.current_state.port_info.insert( + let new_pid = cu.current_state.id_manager.new_port_id(); + cu.current_state.port_info.insert( new_pid, PortInfo { route: Route::LocalComponent, peer: None, - owner: cu.inner.native_component_id, + owner: cu.native_component_id, polarity, }, ); log!( - cu.inner.logger, + cu.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", new_pid, polarity, @@ -156,35 +179,35 @@ impl Connector { let Self { unphased: cu, phased } = self; match &phased { ConnectorPhased::Communication { .. } => { - log!(cu.inner.logger, "Call to connecting in connected state"); + log!(cu.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *cu.inner.logger, + &mut *cu.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.inner.current_state.port_info, + &mut cu.current_state.port_info, &deadline, )?; log!( - cu.inner.logger, + cu.logger, "Successfully connected {} endpoints. info now {:#?} {:#?}", endpoint_manager.net_endpoint_store.endpoint_exts.len(), - &cu.inner.current_state.port_info, + &cu.current_state.port_info, &endpoint_manager, ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.inner.current_state.id_manager.connector_id, - &mut *cu.inner.logger, + cu.current_state.id_manager.connector_id, + &mut *cu.logger, &mut endpoint_manager, &deadline, )?; - log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, @@ -195,7 +218,7 @@ impl Connector { if cfg!(feature = "session_optimization") { session_optimize(cu, &mut comm, &deadline)?; } - log!(cu.inner.logger, "connect() finished. setup phase complete"); + log!(cu.logger, "connect() finished. setup phase complete"); *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } @@ -366,7 +389,7 @@ fn new_endpoint_manager( if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { // FIRST try complete this connection match listener.accept() { - Err(e) if would_block(&e) => continue, // spurious wakeup + Err(e) if err_would_block(&e) => continue, // spurious wakeup Err(_) => { log!(logger, "accept() failure on index {}", index); return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); @@ -781,25 +804,25 @@ fn session_optimize( //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// - log!(cu.inner.logger, "Beginning session optimization"); + log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); comm.endpoint_manager.undelay_all(); while !awaiting.is_empty() { log!( - cu.inner.logger, + cu.logger, "Session gather loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; - log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( - cu.inner.logger, + cu.logger, "Wasn't expecting session info from {:?}. Got {:?}", recv_index, &child_unoptimized_map @@ -812,11 +835,11 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.inner.logger, "discarding old message {:?} during election", msg); + log!(cu.logger, "discarding old message {:?} during election", msg); } msg @ S(Sm::SessionScatter { .. }) => { log!( - cu.inner.logger, + cu.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg @@ -824,18 +847,18 @@ fn session_optimize( return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { - log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg); + log!(cu.logger, "delaying msg {:?} during session optimization", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } } } log!( - cu.inner.logger, + cu.logger, "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.inner.current_state.port_info.clone(), + port_info: cu.current_state.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -846,38 +869,34 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info); - log!( - cu.inner.logger, - "Inserting my own info. Unoptimized subtree map is {:?}", - &unoptimized_map - ); + unoptimized_map.insert(cu.current_state.id_manager.connector_id, my_session_info); + log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent - log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent); + log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( - cu.inner.logger, + cu.logger, "Session scatter recv loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; - log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { - log!(cu.inner.logger, "I expected the scatter from my parent only!"); + log!(cu.logger, "I expected the scatter from my parent only!"); return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } msg @ Msg::CommMsg { .. } => { - log!(cu.inner.logger, "delaying msg {:?} during scatter recv", msg); + log!(cu.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } msg @ S(Sm::SessionGather { .. }) @@ -885,24 +904,24 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.inner.logger, "discarding old message {:?} during election", msg); + log!(cu.logger, "discarding old message {:?} during election", msg); } } } } else { // by computing it myself - log!(cu.inner.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)? + log!(cu.logger, "I am the leader! I will optimize this session"); + leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? }; log!( - cu.inner.logger, + cu.logger, "Optimized info map is {:?}. Sending to children {:?}", &optimized_map, comm.neighborhood.children.iter() ); - log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); + log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = optimized_map - .get(&cu.inner.current_state.id_manager.connector_id) + .get(&cu.current_state.id_manager.connector_id) .expect("HEY NO INFO FOR ME?") .clone(); let msg = S(Sm::SessionScatter { optimized_map }); @@ -910,7 +929,7 @@ fn session_optimize( comm.endpoint_manager.send_to_setup(child, &msg)?; } apply_optimizations(cu, comm, optimized_info)?; - log!(cu.inner.logger, "Session optimizations applied"); + log!(cu.logger, "Session optimizations applied"); Ok(()) } fn leader_session_map_optimize( @@ -933,7 +952,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.inner.current_state.port_info = port_info; + cu.current_state.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm