diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ce8aa92e5852bc1acee49cd198dde65996f59e88..ab12b429d02be2e6aa815d43612319c5744d0efe 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -50,12 +50,12 @@ impl Connector { } } pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { - use ConnectError::*; + use ConnectError as Ce; let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication { .. } => { log!(cu.logger, "Call to connecting in connected state"); - Err(AlreadyConnected) + Err(Ce::AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); @@ -103,7 +103,7 @@ fn new_endpoint_manager( ) -> Result { //////////////////////////////////////////// use std::sync::atomic::AtomicBool; - use ConnectError::*; + use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -129,7 +129,7 @@ fn new_endpoint_manager( TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) - .map_err(|_| BindFailed(endpoint_setup.sock_addr))?; + .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Accepting(listener) }; @@ -150,7 +150,7 @@ fn new_endpoint_manager( assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token let mut waker_continue_signal: Option> = None; - let mut poll = Poll::new().map_err(|_| PollInitFailed)?; + let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); let mut polled_undrained = VecSet::default(); let mut delayed_messages = vec![]; @@ -175,11 +175,11 @@ fn new_endpoint_manager( let mut setup_incomplete: HashSet = (0..todos.len()).collect(); while !setup_incomplete.is_empty() { let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) } else { None }; - poll.poll(&mut events, remaining).map_err(|_| PollFailed)?; + poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; for event in events.iter() { let token = event.token(); let Token(index) = token; @@ -232,7 +232,7 @@ fn new_endpoint_manager( } Err(_) => { log!(logger, "accept() failure on index {}", index); - return Err(AcceptFailed(listener.local_addr().unwrap())); + return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); } } } @@ -240,7 +240,7 @@ fn new_endpoint_manager( if event.is_error() { if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { // right now you cannot retry an acceptor. - return Err(AcceptFailed(endpoint.stream.local_addr().unwrap())); + return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap())); } if connect_failed.insert(index) { log!( @@ -288,7 +288,7 @@ fn new_endpoint_manager( endpoint .send(&msg) .map_err(|e| { - EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) }) .unwrap(); log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); @@ -296,7 +296,7 @@ fn new_endpoint_manager( } if event.is_readable() && todo.recv_peer_port.is_none() { let maybe_msg = endpoint.try_recv(logger).map_err(|e| { - EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { polled_undrained.insert(index); @@ -386,7 +386,7 @@ fn init_neighborhood( deadline: Option, ) -> Result { //////////////////////////////// - use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; #[derive(Debug)] struct WaveState { parent: Option, @@ -398,7 +398,7 @@ fn init_neighborhood( ws: &WaveState, ) -> Result<(), ConnectError> { awaiting.clear(); - let msg = S(LeaderWave { wave_leader: ws.leader }); + let msg = S(Sm::LeaderWave { wave_leader: ws.leader }); for index in em.index_iter() { if Some(index) != ws.parent { em.send_to_setup(index, &msg)?; @@ -445,7 +445,7 @@ fn init_neighborhood( let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(LeaderAnnounce { tree_leader }) => { + S(Sm::LeaderAnnounce { tree_leader }) => { let election_result = WaveState { leader: tree_leader, parent: Some(recv_index) }; log!(logger, "Election lost! Result {:?}", &election_result); @@ -453,7 +453,7 @@ fn init_neighborhood( assert_ne!(election_result.leader, connector_id); break 'election election_result; } - S(LeaderWave { wave_leader }) => { + S(Sm::LeaderWave { wave_leader }) => { use Ordering as O; match wave_leader.cmp(&best_wave.leader) { O::Less => log!( @@ -504,12 +504,12 @@ fn init_neighborhood( } } } - msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => { + msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => { log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } - msg @ S(SessionScatter { .. }) - | msg @ S(SessionGather { .. }) + msg @ S(Sm::SessionScatter { .. }) + | msg @ S(Sm::SessionGather { .. }) | msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election algorithm", msg); em.delayed_messages.push((recv_index, msg)); @@ -523,10 +523,13 @@ fn init_neighborhood( awaiting.clear(); for index in em.index_iter() { if Some(index) == election_result.parent { - em.send_to_setup(index, &S(YouAreMyParent))?; + em.send_to_setup(index, &S(Sm::YouAreMyParent))?; } else { awaiting.insert(index); - em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?; + em.send_to_setup( + index, + &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }), + )?; } } let mut children = vec![]; @@ -536,7 +539,7 @@ fn init_neighborhood( let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(LeaderAnnounce { .. }) => { + S(Sm::LeaderAnnounce { .. }) => { // not a child log!( logger, @@ -545,10 +548,10 @@ fn init_neighborhood( children.iter() ); if !awaiting.remove(&recv_index) { - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } } - S(YouAreMyParent) => { + S(Sm::YouAreMyParent) => { if !awaiting.remove(&recv_index) { log!( logger, @@ -556,15 +559,15 @@ fn init_neighborhood( recv_index, children.iter() ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } children.push(recv_index); } - msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { log!(logger, "discarding old message {:?} during election", msg); } - msg @ S(SessionScatter { .. }) - | msg @ S(SessionGather { .. }) + msg @ S(Sm::SessionScatter { .. }) + | msg @ S(Sm::SessionGather { .. }) | msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election", msg); em.delayed_messages.push((recv_index, msg)); @@ -584,7 +587,7 @@ fn session_optimize( deadline: Option, ) -> Result<(), ConnectError> { //////////////////////////////////////// - use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child @@ -601,7 +604,7 @@ fn session_optimize( comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(SessionGather { unoptimized_map: child_unoptimized_map }) => { + S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( cu.logger, @@ -609,24 +612,24 @@ fn session_optimize( recv_index, &child_unoptimized_map ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } unoptimized_map.extend(child_unoptimized_map.into_iter()); } - msg @ S(YouAreMyParent) - | msg @ S(MyPortInfo(..)) - | msg @ S(LeaderAnnounce { .. }) - | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::YouAreMyParent) + | msg @ S(Sm::MyPortInfo(..)) + | msg @ S(Sm::LeaderAnnounce { .. }) + | msg @ S(Sm::LeaderWave { .. }) => { log!(cu.logger, "discarding old message {:?} during election", msg); } - msg @ S(SessionScatter { .. }) => { + msg @ S(Sm::SessionScatter { .. }) => { log!( cu.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { log!(cu.logger, "delaying msg {:?} during session optimization", msg); @@ -643,7 +646,7 @@ fn session_optimize( port_info: cu.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), - getter_for_incoming: comm + endpoint_incoming_to_getter: comm .endpoint_manager .endpoint_exts .iter() @@ -657,7 +660,7 @@ fn session_optimize( let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); - let msg = S(SessionGather { unoptimized_map }); + let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( @@ -669,10 +672,10 @@ fn session_optimize( comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(SessionScatter { optimized_map }) => { + S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { log!(cu.logger, "I expected the scatter from my parent only!"); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } @@ -680,11 +683,11 @@ fn session_optimize( log!(cu.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } - msg @ S(SessionGather { .. }) - | msg @ S(YouAreMyParent) - | msg @ S(MyPortInfo(..)) - | msg @ S(LeaderAnnounce { .. }) - | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::SessionGather { .. }) + | msg @ S(Sm::YouAreMyParent) + | msg @ S(Sm::MyPortInfo(..)) + | msg @ S(Sm::LeaderAnnounce { .. }) + | msg @ S(Sm::LeaderWave { .. }) => { log!(cu.logger, "discarding old message {:?} during election", msg); } } @@ -703,7 +706,7 @@ fn session_optimize( log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); - let msg = S(SessionScatter { optimized_map }); + let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } @@ -724,12 +727,19 @@ fn apply_optimizations( comm: &mut ConnectorCommunication, session_info: SessionInfo, ) -> Result<(), ConnectError> { - let SessionInfo { proto_components, port_info, serde_proto_description, getter_for_incoming } = - session_info; + let SessionInfo { + proto_components, + port_info, + serde_proto_description, + endpoint_incoming_to_getter, + } = session_info; + // TODO some info which should be read-only can be mutated with the current scheme cu.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; - for (ee, getter) in comm.endpoint_manager.endpoint_exts.iter_mut().zip(getter_for_incoming) { + for (ee, getter) in + comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + { ee.getter_for_incoming = getter; } Ok(())