diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ae3f1d44608bc347e7930846945eda154243cccf..e5a6c01f91e6d422233f5ff0e4ba21eb171dce01 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -5,24 +5,25 @@ use std::io::ErrorKind::WouldBlock; impl Connector { pub fn new_simple( proto_description: Arc, - controller_id: ControllerId, + connector_id: ConnectorId, ) -> Self { let logger = Box::new(DummyLogger); // let logger = Box::new(DummyLogger); let surplus_sockets = 2; - Self::new(logger, proto_description, controller_id, surplus_sockets) + Self::new(logger, proto_description, connector_id, surplus_sockets) } pub fn new( - logger: Box, + mut logger: Box, proto_description: Arc, - controller_id: ControllerId, + connector_id: ConnectorId, surplus_sockets: u16, ) -> Self { + log!(&mut *logger, "Created with connector_id {:?}", connector_id); Self { proto_description, proto_components: Default::default(), logger, - id_manager: IdManager::new(controller_id), + id_manager: IdManager::new(connector_id), native_ports: Default::default(), port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, @@ -77,7 +78,7 @@ impl Connector { ); // leader election and tree construction let neighborhood = init_neighborhood( - self.id_manager.controller_id, + self.id_manager.connector_id, &mut *self.logger, &mut endpoint_manager, deadline, @@ -215,7 +216,7 @@ fn new_endpoint_manager( *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv().map_err(|e| { + let maybe_msg = endpoint.try_recv(logger).map_err(|e| { EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { @@ -260,9 +261,15 @@ fn new_endpoint_manager( } let endpoint_exts = todos .into_iter() - .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt { + .enumerate() + .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt { endpoint: match todo_endpoint { - TodoEndpoint::Endpoint(endpoint) => endpoint, + TodoEndpoint::Endpoint(mut endpoint) => { + poll.registry() + .reregister(&mut endpoint.stream, Token(index), Interest::READABLE) + .unwrap(); + endpoint + } TodoEndpoint::Listener(..) => unreachable!(), }, getter_for_incoming: local_port, @@ -279,137 +286,192 @@ fn new_endpoint_manager( } fn init_neighborhood( - controller_id: ControllerId, + connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, deadline: Option, ) -> Result { - use {Msg::SetupMsg as S, SetupMsg::*}; - log!(logger, "beginning neighborhood construction"); - // 1. broadcast my ID as the first echo. await reply from all neighbors - let echo = S(LeaderEcho { maybe_leader: controller_id }); - let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); - for index in 0..em.endpoint_exts.len() { - log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); - em.send_to_setup(index, &echo)?; - awaiting.insert(index); + use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + //////////////////////////////// + #[derive(Debug)] + struct WaveState { + parent: Option, + leader: ConnectorId, } - - // 2. Receive incoming replies. whenever a higher-id echo arrives, - // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; - let mut my_leader = controller_id; - em.undelay_all(); - 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any_setup(deadline)?; - log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); - match msg { - S(LeaderAnnounce { leader }) => { - // someone else completed the echo and became leader first! - // the sender is my parent - parent = Some(index); - my_leader = leader; - awaiting.clear(); - break 'echo_loop; + fn do_wave( + em: &mut EndpointManager, + awaiting: &mut HashSet, + ws: &WaveState, + ) -> Result<(), ConnectError> { + awaiting.clear(); + let msg = S(LeaderWave { wave_leader: ws.leader }); + for index in em.index_iter() { + if Some(index) != ws.parent { + em.send_to_setup(index, &msg)?; + awaiting.insert(index); } - S(LeaderEcho { maybe_leader }) => { - use Ordering::*; - match maybe_leader.cmp(&my_leader) { - Less => { /* ignore this wave */ } - Equal => { - awaiting.remove(&index); - if awaiting.is_empty() { - if let Some(p) = parent { - // return the echo to my parent - em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?; - } else { - // wave completed! - break 'echo_loop; + } + Ok(()) + } + /////////////////////// + /* + Conceptually, we have two distinct disstributed algorithms back-to-back + 1. Leader election using echo algorithm with extinction. + - Each connector initiates a wave tagged with their ID + - Connectors participate in waves of GREATER ID, abandoning previous waves + - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader + 2. Tree construction + - The leader broadcasts their leadership with msg A + - Upon receiving their first announcement, connectors reply B, and send A to all peers + - A controller exits once they have received A or B from each neighbor + + The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1, + so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun. + + NOTE the distinction between PARENT and LEADER + */ + log!(logger, "beginning neighborhood construction"); + if em.num_endpoints() == 0 { + log!(logger, "Edge case of no neighbors! No parent an no children!"); + return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) }); + } + log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints()); + let mut awaiting = HashSet::with_capacity(em.num_endpoints()); + // 1+ neighbors. Leader can only be learned by receiving messages + // loop ends when I know my sink tree parent (implies leader was elected) + let election_result: WaveState = { + // initially: No parent, I'm the best leader. + let mut best_wave = WaveState { parent: None, leader: connector_id }; + // start a wave for this initial state + do_wave(em, &mut awaiting, &best_wave)?; + // with 1+ neighbors, progress is only made in response to incoming messages + em.undelay_all(); + 'election: loop { + log!(logger, "Election loop. awaiting {:?}...", awaiting.iter()); + let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; + log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + match msg { + S(LeaderAnnounce { tree_leader }) => { + let election_result = + WaveState { leader: tree_leader, parent: Some(recv_index) }; + log!(logger, "Election lost! Result {:?}", &election_result); + assert!(election_result.leader >= best_wave.leader); + assert_ne!(election_result.leader, connector_id); + break 'election election_result; + } + S(LeaderWave { wave_leader }) => { + use Ordering as O; + match wave_leader.cmp(&best_wave.leader) { + O::Less => log!( + logger, + "Ignoring wave with Id {:?}<{:?}", + wave_leader, + best_wave.leader + ), + O::Greater => { + log!( + logger, + "Joining wave with Id {:?}>{:?}", + wave_leader, + best_wave.leader + ); + best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) }; + log!(logger, "New wave state {:?}", &best_wave); + do_wave(em, &mut awaiting, &best_wave)?; + if awaiting.is_empty() { + log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg); + em.send_to_setup(recv_index, &msg)?; } } - } - Greater => { - // join new echo - log!(logger, "Setting leader to index {:?}", index); - parent = Some(index); - my_leader = maybe_leader; - let echo = S(LeaderEcho { maybe_leader: my_leader }); - awaiting.clear(); - if em.endpoint_exts.len() == 1 { - // immediately reply to parent - log!(logger, "replying echo to parent {:?} immediately", index); - em.send_to_setup(index, &echo)?; - } else { - for index2 in 0..em.endpoint_exts.len() { - if index2 == index { - // don't propagate echo to my parent - continue; + O::Equal => { + assert!(awaiting.remove(&recv_index)); + log!( + logger, + "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies", + recv_index, + best_wave.leader, + awaiting.len() + ); + if awaiting.is_empty() { + if let Some(parent) = best_wave.parent { + log!( + logger, + "Sub-wave done! replying to parent {:?} msg {:?}", + parent, + &msg + ); + em.send_to_setup(parent, &msg)?; + } else { + let election_result: WaveState = best_wave; + log!(logger, "Election won! Result {:?}", &election_result); + break 'election election_result; } - log!(logger, "repeating echo {:?} to {:?}", &echo, index2); - em.send_to_setup(index2, &echo)?; - awaiting.insert(index2); } } } } - } - inappropriate_msg => { - log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg); - em.delayed_messages.push((index, inappropriate_msg)) + S(YouAreMyParent) | S(MyPortInfo(_)) => unreachable!(), + comm_msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election algorithm", comm_msg); + em.delayed_messages.push((recv_index, comm_msg)); + } } } - } - match parent { - None => assert_eq!( - my_leader, controller_id, - "I've got no parent, but I consider {:?} the leader?", - my_leader - ), - Some(parent) => assert_ne!( - my_leader, controller_id, - "I have {:?} as parent, but I consider myself ({:?}) the leader?", - parent, controller_id - ), - } - log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); + }; - // 3. broadcast leader announcement (except to parent: confirm they are your parent) - // in this loop, every node sends 1 message to each neighbor - // await 1 message from all non-parents. - let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for index in 0..em.endpoint_exts.len() { - let msg = if Some(index) == parent { - &S(YouAreMyParent) + // starting algorithm 2. Send a message to every neighbor + log!(logger, "Starting tree construction. Step 1: send one msg per neighbor"); + awaiting.clear(); + for index in em.index_iter() { + if Some(index) == election_result.parent { + em.send_to_setup(index, &S(YouAreMyParent))?; } else { awaiting.insert(index); - &msg_for_non_parents - }; - log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); - em.send_to_setup(index, msg)?; + em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?; + } } - let mut children = Vec::default(); + let mut children = vec![]; em.undelay_all(); while !awaiting.is_empty() { - log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any_setup(deadline)?; + log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter()); + let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; + log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(YouAreMyParent) => { - assert!(awaiting.remove(&index)); - children.push(index); + S(LeaderWave { .. }) => { /* old message */ } + S(LeaderAnnounce { .. }) => { + // not a child + log!( + logger, + "Got reply from non-child index {:?}. Children: {:?}", + recv_index, + children.iter() + ); + if !awaiting.remove(&recv_index) { + return Err(SetupAlgMisbehavior); + } } - S(LeaderAnnounce { leader }) => { - assert!(awaiting.remove(&index)); - assert!(leader == my_leader); - assert!(Some(index) != parent); - // they wouldn't send me this if they considered me their parent + S(YouAreMyParent) => { + if !awaiting.remove(&recv_index) { + log!( + logger, + "Got reply from child index {:?}. Children before... {:?}", + recv_index, + children.iter() + ); + return Err(SetupAlgMisbehavior); + } + children.push(recv_index); } - inappropriate_msg => { - log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg); - em.delayed_messages.push((index, inappropriate_msg)); + S(MyPortInfo(_)) => unreachable!(), + comm_msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election algorithm", comm_msg); + em.delayed_messages.push((recv_index, comm_msg)); } } } - children.sort(); - children.dedup(); - Ok(Neighborhood { parent, children }) + children.shrink_to_fit(); + let neighborhood = + Neighborhood { parent: election_result.parent, children: VecSet::new(children) }; + log!(logger, "Neighborhood constructed {:?}", &neighborhood); + Ok(neighborhood) }