diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 07cba79e56c09ed8f7f1444854abf2fb3296a4ae..596c6e14d5343322de78c2c95eec4eda548bd4d5 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -278,18 +278,13 @@ impl Connector { )?; log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); // Put it all together with an initial round index of zero. - let mut comm = ConnectorCommunication { + let comm = ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, native_batches: vec![Default::default()], round_result: Ok(None), // no previous round yet }; - if cfg!(feature = "session_optimization") { - // Perform the session optimization procedure, which may modify the - // internals of the connector, rerouting ports, moving around connectors etc. - session_optimize(&mut cu, &mut comm, &deadline)?; - } log!(cu.logger, "connect() finished. setup phase complete"); Ok(comm) }; @@ -808,9 +803,7 @@ fn init_neighborhood( log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg); return Err(Ce::SetupAlgMisbehavior); } - msg @ S(Sm::SessionScatter { .. }) - | msg @ S(Sm::SessionGather { .. }) - | msg @ Msg::CommMsg { .. } => { + msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election algorithm", msg); em.delayed_messages.push((recv_index, msg)); } @@ -871,9 +864,7 @@ fn init_neighborhood( msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { log!(logger, "discarding old message {:?} during election", msg); } - msg @ S(Sm::SessionScatter { .. }) - | msg @ S(Sm::SessionGather { .. }) - | msg @ Msg::CommMsg { .. } => { + msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election", msg); em.delayed_messages.push((recv_index, msg)); } @@ -886,188 +877,3 @@ fn init_neighborhood( log!(logger, "Neighborhood constructed {:?}", &neighborhood); Ok(neighborhood) } - -// Connectors collect a map of type ConnectorId=>SessionInfo, -// representing a global view of the session's state at the leader. -// The leader rewrites its contents however they like (currently: nothing happens) -// and the map is again broadcasted, for each peer to make their local changes to -// reflect the results of the rewrite. -fn session_optimize( - cu: &mut ConnectorUnphased, - comm: &mut ConnectorCommunication, - deadline: &Option, -) -> Result<(), ConnectError> { - use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; - log!(cu.logger, "Beginning session optimization"); - // populate session_info_map from a message per child - let mut unoptimized_map: HashMap = Default::default(); - let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); - comm.endpoint_manager.undelay_all(); - while !awaiting.is_empty() { - log!( - cu.logger, - "Session gather loop. awaiting info from children {:?}...", - awaiting.iter() - ); - let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); - match msg { - S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { - if !awaiting.remove(&recv_index) { - log!( - cu.logger, - "Wasn't expecting session info from {:?}. Got {:?}", - recv_index, - &child_unoptimized_map - ); - return Err(Ce::SetupAlgMisbehavior); - } - unoptimized_map.extend(child_unoptimized_map.into_iter()); - } - msg @ S(Sm::YouAreMyParent) - | msg @ S(Sm::MyPortInfo(..)) - | msg @ S(Sm::LeaderAnnounce { .. }) - | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); - } - msg @ S(Sm::SessionScatter { .. }) => { - log!( - cu.logger, - "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", - recv_index, - &msg - ); - return Err(Ce::SetupAlgMisbehavior); - } - msg @ Msg::CommMsg(..) => { - log!(cu.logger, "delaying msg {:?} during session optimization", msg); - comm.endpoint_manager.delayed_messages.push((recv_index, msg)); - } - } - } - log!( - cu.logger, - "Gathered all children's maps. ConnectorId set is... {:?}", - unoptimized_map.keys() - ); - // add my own session info to the map - let my_session_info = SessionInfo { - port_info: cu.ips.port_info.clone(), - proto_components: cu.proto_components.clone(), - serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), - endpoint_incoming_to_getter: comm - .endpoint_manager - .net_endpoint_store - .endpoint_exts - .iter() - .map(|ee| ee.getter_for_incoming) - .collect(), - }; - unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info); - log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); - // acquire the optimized info... - let optimized_map = if let Some(parent) = comm.neighborhood.parent { - // ... as a message from my parent - log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); - let msg = S(Sm::SessionGather { unoptimized_map }); - comm.endpoint_manager.send_to_setup(parent, &msg)?; - 'scatter_loop: loop { - log!( - cu.logger, - "Session scatter recv loop. awaiting info from children {:?}...", - awaiting.iter() - ); - let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); - match msg { - S(Sm::SessionScatter { optimized_map }) => { - if recv_index != parent { - log!(cu.logger, "I expected the scatter from my parent only!"); - return Err(Ce::SetupAlgMisbehavior); - } - break 'scatter_loop optimized_map; - } - msg @ Msg::CommMsg { .. } => { - log!(cu.logger, "delaying msg {:?} during scatter recv", msg); - comm.endpoint_manager.delayed_messages.push((recv_index, msg)); - } - msg @ S(Sm::SessionGather { .. }) - | msg @ S(Sm::YouAreMyParent) - | msg @ S(Sm::MyPortInfo(..)) - | msg @ S(Sm::LeaderAnnounce { .. }) - | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); - } - } - } - } else { - // by computing it myself - log!(cu.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? - }; - log!( - cu.logger, - "Optimized info map is {:?}. Sending to children {:?}", - &optimized_map, - comm.neighborhood.children.iter() - ); - log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); - // extract my own ConnectorId's entry - let optimized_info = - optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); - // broadcast the optimized session info to my children - let msg = S(Sm::SessionScatter { optimized_map }); - for &child in comm.neighborhood.children.iter() { - comm.endpoint_manager.send_to_setup(child, &msg)?; - } - // apply local optimizations - apply_my_optimizations(cu, comm, optimized_info)?; - log!(cu.logger, "Session optimizations applied"); - Ok(()) -} - -// Defines the optimization function, consuming an optimized map, -// and returning an optimized map. -fn leader_session_map_optimize( - logger: &mut dyn Logger, - m: HashMap, -) -> Result, ConnectError> { - log!(logger, "Session map optimize START"); - // currently, it's the identity function - log!(logger, "Session map optimize END"); - Ok(m) -} - -// Modify the given connector's internals to reflect -// the given session info -fn apply_my_optimizations( - cu: &mut ConnectorUnphased, - comm: &mut ConnectorCommunication, - session_info: SessionInfo, -) -> Result<(), ConnectError> { - let SessionInfo { - proto_components, - port_info, - serde_proto_description, - endpoint_incoming_to_getter, - } = session_info; - // simply overwrite the contents - println!("BEFORE: {:#?}\n{:#?}", cu, comm); - cu.ips.port_info = port_info; - assert!(cu.ips.port_info.invariant_preserved()); - cu.proto_components = proto_components; - cu.proto_description = serde_proto_description.0; - for (ee, getter) in comm - .endpoint_manager - .net_endpoint_store - .endpoint_exts - .iter_mut() - .zip(endpoint_incoming_to_getter) - { - ee.getter_for_incoming = getter; - } - // println!("AFTER: {:#?}\n{:#?}", cu, comm); - Ok(()) -}