diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 119c489e36c57a53a17c9571fcc6a7d809fd10bb..35561a07596524f63836ed3c36fe668052eeb457 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,16 +1,22 @@ use crate::common::*; use crate::runtime::*; +#[derive(Default)] +struct ExtraPortInfo { + info: HashMap, + peers: HashMap, +} + impl TokenTarget { + // subdivides the domain of usize into + // [NET_ENDPOINT][UDP_ENDPOINT ] + // ^0 ^usize::MAX/2 ^usize::MAX const HALFWAY_INDEX: usize = usize::MAX / 2; const MAX_INDEX: usize = usize::MAX; - const WAKER_TOKEN: usize = Self::MAX_INDEX; } impl From for TokenTarget { fn from(Token(index): Token) -> Self { - if index == Self::WAKER_TOKEN { - TokenTarget::Waker - } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { TokenTarget::UdpEndpoint { index: shifted } } else { TokenTarget::NetEndpoint { index } @@ -20,7 +26,6 @@ impl From for TokenTarget { impl Into for TokenTarget { fn into(self) -> Token { match self { - TokenTarget::Waker => Token(Self::WAKER_TOKEN), TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), TokenTarget::NetEndpoint { index } => Token(index), } @@ -60,6 +65,7 @@ impl Connector { })), } } + /// Conceptually, this returning [p0, g1] is sugar for: /// 1. create port pair [p0, g0] /// 2. create port pair [p1, g1] @@ -76,19 +82,12 @@ impl Connector { ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); let udp_cid = cu.ips.id_manager.new_component_id(); + // allocates 4 new port identifiers, two for each logical channel, + // one channel per direction (into and out of the component) let mut npid = || cu.ips.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - - cu.ips.port_info.insert( - nin, - PortInfo { - route: Route::LocalComponent, - polarity: Getter, - peer: Some(uout), - owner: cu.native_component_id, - }, - ); - cu.ips.port_info.insert( + // allocate the native->udp_mediator channel's ports + cu.ips.port_info.map.insert( nout, PortInfo { route: Route::LocalComponent, @@ -97,7 +96,7 @@ impl Connector { owner: cu.native_component_id, }, ); - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( uin, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -106,7 +105,8 @@ impl Connector { owner: udp_cid, }, ); - cu.ips.port_info.insert( + // allocate the udp_mediator->native channel's ports + cu.ips.port_info.map.insert( uout, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -115,11 +115,23 @@ impl Connector { owner: udp_cid, }, ); + cu.ips.port_info.map.insert( + nin, + PortInfo { + route: Route::LocalComponent, + polarity: Getter, + peer: Some(uout), + owner: cu.native_component_id, + }, + ); + // allocate the two ports owned by the UdpMediator component + // Remember to setup this UdpEndpoint setup during `connect` later. setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, getter_for_incoming: nin, }); + // Return the native's output, input port pair Ok([nout, nin]) } } @@ -138,8 +150,9 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { + // allocate a single dangling port with a `None` peer (for now) let new_pid = cu.ips.id_manager.new_port_id(); - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( new_pid, PortInfo { route: Route::LocalComponent, @@ -156,6 +169,7 @@ impl Connector { &sock_addr, endpoint_polarity ); + // Remember to setup this NetEndpoint setup during `connect` later. setup.net_endpoint_setups.push(NetEndpointSetup { sock_addr, endpoint_polarity, @@ -186,11 +200,11 @@ impl Connector { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = new_endpoint_manager( + let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports( &mut *cu.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.ips.port_info, + &cu.ips.port_info, &deadline, )?; log!( @@ -200,7 +214,8 @@ impl Connector { &cu.ips.port_info, &endpoint_manager, ); - // leader election and tree construction + // leader election and tree construction. Learn our role in the consensus tree, + // from learning who are our children/parents (neighbors) in the consensus tree. let neighborhood = init_neighborhood( cu.ips.id_manager.connector_id, &mut *cu.logger, @@ -208,80 +223,110 @@ impl Connector { &deadline, )?; log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + // Put it all together with an initial round index of zero. let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, native_batches: vec![Default::default()], - round_result: Ok(None), + round_result: Ok(None), // no previous round yet }; if cfg!(feature = "session_optimization") { + // Perform the session optimization procedure, which may modify the + // internals of the connector, rerouting ports, moving around connectors etc. session_optimize(cu, &mut comm, &deadline)?; } log!(cu.logger, "connect() finished. setup phase complete"); + // Connect procedure successful! Commit changes by... + // ... commiting new port info for ConnectorUnphased + for (port, info) in extra_port_info.info.drain() { + cu.ips.port_info.map.insert(port, info); + } + for (port, peer) in extra_port_info.peers.drain() { + cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer); + } + // ... replacing the connector's phase to "communication" *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } } } } -fn new_endpoint_manager( + +// Given a set of net_ and udp_ endpoints to setup, +// port information to flesh out (by discovering peers through channels) +// and a deadline in which to do it, +// try to return: +// - An EndpointManager, containing all the set up endpoints +// - new information about ports acquired through the newly-created channels +fn setup_endpoints_and_pair_ports( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &mut HashMap, + port_info: &PortInfoMap, deadline: &Option, -) -> Result { - //////////////////////////////////////////// - use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> { use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); - const WAKER_PERIOD: Duration = Duration::from_millis(300); - struct WakerState { - continue_signal: AtomicBool, - waker: mio::Waker, - } - impl WakerState { - fn waker_loop(&self) { - while self.continue_signal.load(SeqCst) { - std::thread::sleep(WAKER_PERIOD); - let _ = self.waker.wake(); - } - } - fn waker_stop(&self) { - self.continue_signal.store(false, SeqCst); - // TODO keep waker registered? - } - } - struct Todo { + const RETRY_PERIOD: Duration = Duration::from_millis(200); + + // The structure shared between this ("setup") thread and that of the waker. + // The waker thread periodically sends signals. + // struct WakerState { + // continue_signal: AtomicBool, + // waker: mio::Waker, + // } + // impl WakerState { + // // The waker thread runs this UNTIL the continue signal is set to false + // fn waker_loop(&self) { + // while self.continue_signal.load(SeqCst) { + // std::thread::sleep(WAKER_PERIOD); + // let _ = self.waker.wake(); + // } + // } + // // The setup thread thread runs this to set the continue signal to false. + // fn waker_stop(&self) { + // self.continue_signal.store(false, SeqCst); + // } + // } + + // The data for a net endpoint's setup in progress + struct NetTodo { // becomes completed once sent_local_port && recv_peer_port.is_some() // we send local port if we haven't already and we receive a writable event // we recv peer port if we haven't already and we receive a readbale event - todo_endpoint: TodoEndpoint, + todo_endpoint: NetTodoEndpoint, endpoint_setup: NetEndpointSetup, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } + + // The data for a udp endpoint's setup in progress struct UdpTodo { // becomes completed once we receive our first writable event getter_for_incoming: PortId, sock: UdpSocket, } - enum TodoEndpoint { - Accepting(TcpListener), - NetEndpoint(NetEndpoint), + + // Substructure of `NetTodo`, which represents the endpoint itself + enum NetTodoEndpoint { + Accepting(TcpListener), // awaiting it's peer initiating the connection + PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel } + //////////////////////////////////////////// - // 1. Start to construct EndpointManager - let mut waker_state: Option> = None; + // Start to construct our return values + // let mut waker_state: Option> = None; + let mut extra_port_info = ExtraPortInfo::default(); let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4); let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()]; let mut delayed_messages = vec![]; + let mut last_retry_at = Instant::now(); - // 2. Create net/udp TODOs, each already registered with poll + // Create net/udp todo structures, each already registered with poll let mut net_todos = net_endpoint_setups .iter() .enumerate() @@ -292,21 +337,21 @@ fn new_endpoint_manager( let mut stream = TcpStream::connect(endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) + NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); - TodoEndpoint::Accepting(listener) + NetTodoEndpoint::Accepting(listener) }; - Ok(Todo { + Ok(NetTodo { todo_endpoint, sent_local_port: false, recv_peer_port: None, endpoint_setup: endpoint_setup.clone(), }) }) - .collect::, ConnectError>>()?; + .collect::, ConnectError>>()?; let udp_todos = udp_endpoint_setups .iter() .enumerate() @@ -322,8 +367,8 @@ fn new_endpoint_manager( }) .collect::, ConnectError>>()?; - // Initially, (1) no net connections have failed, and (2) all udp and net endpoint setups are incomplete - let mut net_connect_retry_later: HashSet = Default::default(); + // Initially no net connections have failed, and all udp and net endpoint setups are incomplete + let mut net_connect_to_retry: HashSet = Default::default(); let mut setup_incomplete: HashSet = { let net_todo_targets_iter = (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index }); @@ -333,47 +378,49 @@ fn new_endpoint_manager( }; // progress by reacting to poll events. continue until every endpoint is set up while !setup_incomplete.is_empty() { + // recompute the time left to poll for progress let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) + deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD) } else { - None + RETRY_PERIOD }; - poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; + // block until either + // (a) `events` has been populated with 1+ elements + // (b) timeout elapses, or + // (c) RETRY_PERIOD elapses + poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?; + if last_retry_at.elapsed() > RETRY_PERIOD { + // Retry all net connections and reset `last_retry_at` + last_retry_at = Instant::now(); + for net_index in net_connect_to_retry.drain() { + // Restart connect procedure for this net endpoint + let net_todo = &mut net_todos[net_index]; + log!( + logger, + "Restarting connection with endpoint {:?} {:?}", + net_index, + net_todo.endpoint_setup.sock_addr + ); + match &mut net_todo.todo_endpoint { + NetTodoEndpoint::PeerInfoRecving(endpoint) => { + let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); + std::mem::swap(&mut endpoint.stream, &mut new_stream); + let token = TokenTarget::NetEndpoint { index: net_index }.into(); + poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap(); + } + _ => unreachable!(), + } + } + } for event in events.iter() { let token = event.token(); + // figure out which endpoint the event belonged to let token_target = TokenTarget::from(token); match token_target { - TokenTarget::Waker => { - log!( - logger, - "Notification from waker. connect_failed is {:?}", - net_connect_retry_later.iter() - ); - assert!(waker_state.is_some()); - for net_index in net_connect_retry_later.drain() { - let net_todo = &mut net_todos[net_index]; - log!( - logger, - "Restarting connection with endpoint {:?} {:?}", - net_index, - net_todo.endpoint_setup.sock_addr - ); - match &mut net_todo.todo_endpoint { - TodoEndpoint::NetEndpoint(endpoint) => { - let mut new_stream = - TcpStream::connect(net_todo.endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - std::mem::swap(&mut endpoint.stream, &mut new_stream); - let token = TokenTarget::NetEndpoint { index: net_index }.into(); - poll.registry() - .register(&mut endpoint.stream, token, BOTH) - .unwrap(); - } - _ => unreachable!(), - } - } - } TokenTarget::UdpEndpoint { index } => { + // UdpEndpoints are easy to complete. + // Their setup event just has to succeed without error if !setup_incomplete.contains(&token_target) { // spurious wakeup. this endpoint has already been set up! continue; @@ -385,9 +432,12 @@ fn new_endpoint_manager( setup_incomplete.remove(&token_target); } TokenTarget::NetEndpoint { index } => { + // NetEndpoints are complex to complete, + // they must accept/connect to their peer, + // and then exchange port info successfully let net_todo = &mut net_todos[index]; - if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { - // FIRST try complete this connection + if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { + // Passive endpoint that will first try accept the peer's connection match listener.accept() { Err(e) if err_would_block(&e) => continue, // spurious wakeup Err(_) => { @@ -406,51 +456,32 @@ fn new_endpoint_manager( peer_addr ); let net_endpoint = NetEndpoint { stream, inbox: vec![] }; - net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); + net_todo.todo_endpoint = + NetTodoEndpoint::PeerInfoRecving(net_endpoint); } } } - if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint { + // OK now let's try and finish exchanging port info + if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) = + &mut net_todo.todo_endpoint + { if event.is_error() { + // event signals some error! :( if net_todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { - // right now you cannot retry an acceptor. return failure + // breaking as the acceptor is currently unrecoverable return Err(Ce::AcceptFailed( net_endpoint.stream.local_addr().unwrap(), )); } // this actively-connecting endpoint failed to connect! - if net_connect_retry_later.insert(index) { - log!( - logger, - "Connection failed for {:?}. List is {:?}", - index, - net_connect_retry_later.iter() - ); - poll.registry().deregister(&mut net_endpoint.stream).unwrap(); - } else { - // spurious wakeup. already scheduled to retry connect later - continue; - } - if waker_state.is_none() { - log!(logger, "First connect failure. Starting waker thread"); - let arc = Arc::new(WakerState { - waker: mio::Waker::new( - poll.registry(), - TokenTarget::Waker.into(), - ) - .unwrap(), - continue_signal: true.into(), - }); - let moved_arc = arc.clone(); - waker_state = Some(arc); - std::thread::spawn(move || moved_arc.waker_loop()); - } + // We will schedule it for a retry + net_connect_to_retry.insert(index); continue; } // event wasn't ERROR - if net_connect_retry_later.contains(&index) { + if net_connect_to_retry.contains(&index) { // spurious wakeup. already scheduled to retry connect later continue; } @@ -462,8 +493,9 @@ fn new_endpoint_manager( continue; } let local_info = port_info - .get_mut(&net_todo.endpoint_setup.getter_for_incoming) - .unwrap(); + .map + .get(&net_todo.endpoint_setup.getter_for_incoming) + .expect("Net Setup's getter port info isn't known"); // unreachable if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { @@ -484,7 +516,7 @@ fn new_endpoint_manager( net_todo.sent_local_port = true; } if event.is_readable() && net_todo.recv_peer_port.is_none() { - // can read and didn't recv setup msg yet? Do so! + // can read and didn't finish recving setup msg yet? Do so! let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { Ce::NetEndpointSetupError( net_endpoint.stream.local_addr().unwrap(), @@ -509,15 +541,21 @@ fn new_endpoint_manager( )); } net_todo.recv_peer_port = Some(peer_info.port); - // 1. finally learned the peer of this port! - local_info.peer = Some(peer_info.port); - // 2. learned the info of this peer port - port_info.entry(peer_info.port).or_insert(PortInfo { - peer: Some(net_todo.endpoint_setup.getter_for_incoming), - polarity: peer_info.polarity, - owner: peer_info.owner, - route: Route::NetEndpoint { index }, - }); + // finally learned the peer of this port! + extra_port_info.peers.insert( + net_todo.endpoint_setup.getter_for_incoming, + peer_info.port, + ); + // learned the info of this peer port + if !port_info.map.contains_key(&peer_info.port) { + let info = PortInfo { + peer: Some(net_todo.endpoint_setup.getter_for_incoming), + polarity: peer_info.polarity, + owner: peer_info.owner, + route: Route::NetEndpoint { index }, + }; + extra_port_info.info.insert(peer_info.port, info); + } } Some(inappropriate_msg) => { log!( @@ -542,15 +580,12 @@ fn new_endpoint_manager( events.clear(); } log!(logger, "Endpoint setup complete! Cleaning up and building structures"); - if let Some(ws) = waker_state.take() { - ws.waker_stop(); - } let net_endpoint_exts = net_todos .into_iter() .enumerate() - .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { + .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { net_endpoint: match todo_endpoint { - TodoEndpoint::NetEndpoint(mut net_endpoint) => { + NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => { let token = TokenTarget::NetEndpoint { index }.into(); poll.registry() .reregister(&mut net_endpoint.stream, token, Interest::READABLE) @@ -577,7 +612,7 @@ fn new_endpoint_manager( } }) .collect(); - Ok(EndpointManager { + let endpoint_manager = EndpointManager { poll, events, undelayed_messages: delayed_messages, // no longer delayed @@ -591,22 +626,33 @@ fn new_endpoint_manager( polled_undrained: udp_polled_undrained, }, udp_in_buffer: Default::default(), - }) + }; + Ok((endpoint_manager, extra_port_info)) } +// Given a fully-formed endpoint manager, +// construct the consensus tree with: +// 1. decentralized leader election +// 2. centralized tree construction fn init_neighborhood( connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, deadline: &Option, ) -> Result { - //////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; + + // storage structure for the state of a distributed wave + // (for readability) #[derive(Debug)] struct WaveState { parent: Option, leader: ConnectorId, } + + // kick off a leader-election wave rooted at myself + // given the desired wave information + // (e.g. don't inform my parent if they exist) fn do_wave( em: &mut EndpointManager, awaiting: &mut HashSet, @@ -661,6 +707,8 @@ fn init_neighborhood( log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::LeaderAnnounce { tree_leader }) => { + // A neighbor explicitly tells me who is the leader + // they become my parent, and I adopt their announced leader let election_result = WaveState { leader: tree_leader, parent: Some(recv_index) }; log!(logger, "Election lost! Result {:?}", &election_result); @@ -734,6 +782,8 @@ fn init_neighborhood( }; // starting algorithm 2. Send a message to every neighbor + // namely, send "YouAreMyParent" to parent (if they exist), + // and LeaderAnnounce to everyone else log!(logger, "Starting tree construction. Step 1: send one msg per neighbor"); awaiting.clear(); for index in em.index_iter() { @@ -747,6 +797,8 @@ fn init_neighborhood( )?; } } + // Receive one message from each neighbor to learn + // whether they consider me their parent or not. let mut children = vec![]; em.undelay_all(); while !awaiting.is_empty() { @@ -755,7 +807,7 @@ fn init_neighborhood( log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::LeaderAnnounce { .. }) => { - // not a child + // `recv_index` is not my child log!( logger, "Got reply from non-child index {:?}. Children: {:?}", @@ -776,6 +828,7 @@ fn init_neighborhood( ); return Err(Ce::SetupAlgMisbehavior); } + // `recv_index` is my child children.push(recv_index); } msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { @@ -789,6 +842,7 @@ fn init_neighborhood( } } } + // Neighborhood complete! children.shrink_to_fit(); let neighborhood = Neighborhood { parent: election_result.parent, children: VecSet::new(children) }; @@ -796,14 +850,17 @@ fn init_neighborhood( Ok(neighborhood) } +// Connectors collect a map of type ConnectorId=>SessionInfo, +// representing a global view of the session's state at the leader. +// The leader rewrites its contents however they like (currently: nothing happens) +// and the map is again broadcasted, for each peer to make their local changes to +// reflect the results of the rewrite. fn session_optimize( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, deadline: &Option, ) -> Result<(), ConnectError> { - //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; - //////////////////////////////////////// log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); @@ -857,6 +914,7 @@ fn session_optimize( "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); + // add my own session info to the map let my_session_info = SessionInfo { port_info: cu.ips.port_info.clone(), proto_components: cu.proto_components.clone(), @@ -871,7 +929,6 @@ fn session_optimize( }; unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info); log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); - // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent @@ -920,25 +977,35 @@ fn session_optimize( comm.neighborhood.children.iter() ); log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); + // extract my own ConnectorId's entry let optimized_info = optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + // broadcast the optimized session info to my children let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } - apply_optimizations(cu, comm, optimized_info)?; + // apply local optimizations + apply_my_optimizations(cu, comm, optimized_info)?; log!(cu.logger, "Session optimizations applied"); Ok(()) } + +// Defines the optimization function, consuming an optimized map, +// and returning an optimized map. fn leader_session_map_optimize( logger: &mut dyn Logger, unoptimized_map: HashMap, ) -> Result, ConnectError> { log!(logger, "Session map optimize START"); + // currently, it's the identity function log!(logger, "Session map optimize END"); Ok(unoptimized_map) } -fn apply_optimizations( + +// Modify the given connector's internals to reflect +// the given session info +fn apply_my_optimizations( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, session_info: SessionInfo, @@ -949,7 +1016,7 @@ fn apply_optimizations( serde_proto_description, endpoint_incoming_to_getter, } = session_info; - // TODO some info which should be read-only can be mutated with the current scheme + // simply overwrite the contents cu.ips.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0;