diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index fcd4c87a48d131ae8c230ce6de899cf5f03ad353..8f099b79463cb95d2007601224e24f1779e6b0d1 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -115,8 +115,13 @@ impl Connector { )?; log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len()); // leader election and tree construction - let neighborhood = - init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?; + let neighborhood = init_neighborhood( + self.id_manager.controller_id, + &mut *self.logger, + &mut endpoint_exts, + &mut endpoint_poller, + deadline, + )?; log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { @@ -137,10 +142,6 @@ fn init_endpoints( inp_to_route: &mut HashMap, deadline: Instant, ) -> Result<(Vec, EndpointPoller), ()> { - use mio07::{ - net::{TcpListener, TcpStream}, - Events, Interest, Poll, Token, - }; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -182,7 +183,8 @@ fn init_endpoints( poll: Poll::new().map_err(drop)?, events: Events::with_capacity(64), undrained_endpoints: Default::default(), - delayed_inp_messages: Default::default(), + delayed_messages: Default::default(), + undelayed_messages: Default::default(), }; let mut todos = endpoint_setups @@ -264,12 +266,133 @@ fn init_endpoints( } fn init_neighborhood( + controller_id: ControllerId, logger: &mut dyn Logger, endpoint_exts: &mut [EndpointExt], - endpoint_poller: &mut EndpointPoller, + ep: &mut EndpointPoller, + deadline: Instant, ) -> Result { - log!(logger, "Time to construct my neighborhood"); - let parent = None; - let children = Default::default(); + log!(logger, "beginning neighborhood construction"); + use Msg::SetupMsg as S; + use SetupMsg::*; + + // 1. broadcast my ID as the first echo. await reply from all neighbors + let echo = S(LeaderEcho { maybe_leader: controller_id }); + let mut awaiting = HashSet::with_capacity(endpoint_exts.len()); + for (index, ee) in endpoint_exts.iter_mut().enumerate() { + log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); + ee.endpoint.send(&echo)?; + awaiting.insert(index); + } + + // 2. Receive incoming replies. whenever a higher-id echo arrives, + // adopt it as leader, sender as parent, and reset the await set. + let mut parent: Option = None; + let mut my_leader = controller_id; + ep.undelay_all(); + 'echo_loop: while !awaiting.is_empty() || parent.is_some() { + let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); + match msg { + S(LeaderAnnounce { leader }) => { + // someone else completed the echo and became leader first! + // the sender is my parent + parent = Some(index); + my_leader = leader; + awaiting.clear(); + break 'echo_loop; + } + S(LeaderEcho { maybe_leader }) => { + use Ordering::*; + match maybe_leader.cmp(&my_leader) { + Less => { /* ignore */ } + Equal => { + awaiting.remove(&index); + if awaiting.is_empty() { + if let Some(p) = parent { + // return the echo to my parent + endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?; + } else { + // DECIDE! + break 'echo_loop; + } + } + } + Greater => { + // join new echo + log!(logger, "Setting leader to index {:?}", index); + parent = Some(index); + my_leader = maybe_leader; + let echo = S(LeaderEcho { maybe_leader: my_leader }); + awaiting.clear(); + if endpoint_exts.len() == 1 { + // immediately reply to parent + log!(logger, "replying echo to parent {:?} immediately", index); + endpoint_exts[index].endpoint.send(&echo)?; + } else { + for (index2, ee) in endpoint_exts.iter_mut().enumerate() { + if index2 == index { + continue; + } + log!(logger, "repeating echo {:?} to {:?}", &echo, index2); + ee.endpoint.send(&echo)?; + awaiting.insert(index2); + } + } + } + } + } + inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + } + } + match parent { + None => assert_eq!( + my_leader, controller_id, + "I've got no parent, but I consider {:?} the leader?", + my_leader + ), + Some(parent) => assert_ne!( + my_leader, controller_id, + "I have {:?} as parent, but I consider myself ({:?}) the leader?", + parent, controller_id + ), + } + + log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); + + // 3. broadcast leader announcement (except to parent: confirm they are your parent) + // in this loop, every node sends 1 message to each neighbor + // await 1 message from all non-parents + let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); + for (index, ee) in endpoint_exts.iter_mut().enumerate() { + let msg = if Some(index) == parent { + &S(YouAreMyParent) + } else { + awaiting.insert(index); + &msg_for_non_parents + }; + log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); + ee.endpoint.send(msg)?; + } + let mut children = Vec::default(); + ep.undelay_all(); + while !awaiting.is_empty() { + let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + match msg { + S(YouAreMyParent) => { + assert!(awaiting.remove(&index)); + children.push(index); + } + S(SetupMsg::LeaderAnnounce { leader }) => { + assert!(awaiting.remove(&index)); + assert!(leader == my_leader); + assert!(Some(index) != parent); + // they wouldn't send me this if they considered me their parent + } + inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + } + } + children.sort(); + children.dedup(); Ok(Neighborhood { parent, children }) }