diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index abbbbabd1cd3d31924605d421103113d3f6449e0..97f9673898fe7bb6cc82e6499ec79bf3b55817e4 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -3,15 +3,6 @@ use crate::runtime::*; use std::io::ErrorKind::WouldBlock; impl Connector { - pub fn new_simple( - proto_description: Arc, - connector_id: ConnectorId, - ) -> Self { - let logger = Box::new(DummyLogger); - // let logger = Box::new(DummyLogger); - let surplus_sockets = 2; - Self::new(logger, proto_description, connector_id, surplus_sockets) - } pub fn new( mut logger: Box, proto_description: Arc, @@ -61,45 +52,46 @@ impl Connector { } pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError::*; - let Self { unphased: up, phased } = self; + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication { .. } => { - log!(up.logger, "Call to connecting in connected state"); + log!(cu.logger, "Call to connecting in connected state"); Err(AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { - log!(up.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *up.logger, + &mut *cu.logger, endpoint_setups, - &mut up.port_info, + &mut cu.port_info, deadline, )?; log!( - up.logger, + cu.logger, "Successfully connected {} endpoints", endpoint_manager.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( - up.id_manager.connector_id, - &mut *up.logger, + cu.id_manager.connector_id, + &mut *cu.logger, &mut endpoint_manager, deadline, )?; - log!(up.logger, "Successfully created neighborhood {:?}", &neighborhood); - log!(up.logger, "connect() finished. setup phase complete"); - // TODO session optimization goes here - self.phased = ConnectorPhased::Communication(ConnectorCommunication { + log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, mem_inbox: Default::default(), native_batches: vec![Default::default()], round_result: Ok(None), - }); + }; + session_optimize(cu, &mut comm, deadline)?; + log!(cu.logger, "connect() finished. setup phase complete"); + self.phased = ConnectorPhased::Communication(comm); Ok(()) } } @@ -297,8 +289,8 @@ fn init_neighborhood( em: &mut EndpointManager, deadline: Option, ) -> Result { - use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; //////////////////////////////// + use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; #[derive(Debug)] struct WaveState { parent: Option, @@ -416,10 +408,15 @@ fn init_neighborhood( } } } - S(YouAreMyParent) | S(MyPortInfo(_)) => unreachable!(), - comm_msg @ Msg::CommMsg { .. } => { - log!(logger, "delaying msg {:?} during election algorithm", comm_msg); - em.delayed_messages.push((recv_index, comm_msg)); + msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => { + log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg); + return Err(SetupAlgMisbehavior); + } + msg @ S(SessionScatter { .. }) + | msg @ S(SessionGather { .. }) + | msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election algorithm", msg); + em.delayed_messages.push((recv_index, msg)); } } } @@ -443,7 +440,6 @@ fn init_neighborhood( let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(LeaderWave { .. }) => { /* old message */ } S(LeaderAnnounce { .. }) => { // not a child log!( @@ -468,10 +464,14 @@ fn init_neighborhood( } children.push(recv_index); } - S(MyPortInfo(_)) => unreachable!(), - comm_msg @ Msg::CommMsg { .. } => { - log!(logger, "delaying msg {:?} during election algorithm", comm_msg); - em.delayed_messages.push((recv_index, comm_msg)); + msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => { + log!(logger, "discarding old message {:?} during election", msg); + } + msg @ S(SessionScatter { .. }) + | msg @ S(SessionGather { .. }) + | msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election", msg); + em.delayed_messages.push((recv_index, msg)); } } } @@ -481,3 +481,138 @@ fn init_neighborhood( log!(logger, "Neighborhood constructed {:?}", &neighborhood); Ok(neighborhood) } + +fn session_optimize( + cu: &mut ConnectorUnphased, + comm: &mut ConnectorCommunication, + deadline: Option, +) -> Result<(), ConnectError> { + //////////////////////////////////////// + use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + //////////////////////////////////////// + log!(cu.logger, "Beginning session optimization"); + // populate session_info_map from a message per child + let mut unoptimized_map: HashMap = Default::default(); + let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); + comm.endpoint_manager.undelay_all(); + while !awaiting.is_empty() { + log!( + cu.logger, + "Session gather loop. awaiting info from children {:?}...", + awaiting.iter() + ); + let (recv_index, msg) = + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + match msg { + S(SessionGather { unoptimized_map: child_unoptimized_map }) => { + if !awaiting.remove(&recv_index) { + log!( + cu.logger, + "Wasn't expecting session info from {:?}. Got {:?}", + recv_index, + &child_unoptimized_map + ); + return Err(SetupAlgMisbehavior); + } + unoptimized_map.extend(child_unoptimized_map.into_iter()); + } + msg @ S(YouAreMyParent) + | msg @ S(MyPortInfo(..)) + | msg @ S(LeaderAnnounce { .. }) + | msg @ S(LeaderWave { .. }) => { + log!(cu.logger, "discarding old message {:?} during election", msg); + } + msg @ S(SessionScatter { .. }) => { + log!( + cu.logger, + "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", + recv_index, + &msg + ); + return Err(SetupAlgMisbehavior); + } + msg @ Msg::CommMsg(..) => { + log!(cu.logger, "delaying msg {:?} during session optimization", msg); + comm.endpoint_manager.delayed_messages.push((recv_index, msg)); + } + } + } + log!( + cu.logger, + "Gathered all children's maps. ConnectorId set is... {:?}", + unoptimized_map.keys() + ); + let my_session_info = SessionInfo {}; + unoptimized_map.insert(cu.id_manager.connector_id, my_session_info); + log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); + + // acquire the optimized info... + let optimized_map = if let Some(parent) = comm.neighborhood.parent { + // ... as a message from my parent + log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); + let msg = S(SessionGather { unoptimized_map }); + comm.endpoint_manager.send_to_setup(parent, &msg)?; + 'scatter_loop: loop { + log!( + cu.logger, + "Session scatter recv loop. awaiting info from children {:?}...", + awaiting.iter() + ); + let (recv_index, msg) = + comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; + log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + match msg { + S(SessionScatter { optimized_map }) => { + if recv_index != parent { + log!(cu.logger, "I expected the scatter from my parent only!"); + return Err(SetupAlgMisbehavior); + } + break 'scatter_loop optimized_map; + } + msg @ Msg::CommMsg { .. } => { + log!(cu.logger, "delaying msg {:?} during scatter recv", msg); + comm.endpoint_manager.delayed_messages.push((recv_index, msg)); + } + msg @ S(SessionGather { .. }) + | msg @ S(YouAreMyParent) + | msg @ S(MyPortInfo(..)) + | msg @ S(LeaderAnnounce { .. }) + | msg @ S(LeaderWave { .. }) => { + log!(cu.logger, "discarding old message {:?} during election", msg); + } + } + } + } else { + // by computing it myself + log!(cu.logger, "I am the leader! I will optimize this session"); + leader_session_map_optimize(unoptimized_map)? + }; + log!( + cu.logger, + "Optimized info map is {:?}. Sending to children {:?}", + &optimized_map, + comm.neighborhood.children.iter() + ); + let optimized_info = + optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + let msg = S(SessionScatter { optimized_map }); + for &child in comm.neighborhood.children.iter() { + comm.endpoint_manager.send_to_setup(child, &msg)?; + } + apply_optimizations(cu, comm, optimized_info)?; + log!(cu.logger, "Session optimization complete"); + Ok(()) +} +fn leader_session_map_optimize( + unoptimized_map: HashMap, +) -> Result, ConnectError> { + Ok(unoptimized_map) +} +fn apply_optimizations( + _cu: &mut ConnectorUnphased, + _comm: &mut ConnectorCommunication, + _session_info: SessionInfo, +) -> Result<(), ConnectError> { + Ok(()) +}