diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index 8f099b79463cb95d2007601224e24f1779e6b0d1..6146f54d2d4f57e88d4692034a6cec1d8850ca5f 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -107,26 +107,28 @@ impl Connector { log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports - let (mut endpoint_exts, mut endpoint_poller) = init_endpoints( + let mut endpoint_manager = init_endpoints( &mut *self.logger, endpoint_setups, &mut self.inp_to_route, deadline, )?; - log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len()); + log!( + self.logger, + "Successfully connected {} endpoints", + endpoint_manager.endpoint_exts.len() + ); // leader election and tree construction let neighborhood = init_neighborhood( self.id_manager.controller_id, &mut *self.logger, - &mut endpoint_exts, - &mut endpoint_poller, + &mut endpoint_manager, deadline, )?; log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { - endpoint_poller, - endpoint_exts, + endpoint_manager, neighborhood, mem_inbox: Default::default(), }; @@ -141,7 +143,7 @@ fn init_endpoints( endpoint_setups: &[(PortId, EndpointSetup)], inp_to_route: &mut HashMap, deadline: Instant, -) -> Result<(Vec, EndpointPoller), ()> { +) -> Result { const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -179,34 +181,35 @@ fn init_endpoints( }; //////////////////////// - let mut ep = EndpointPoller { + let mut em = EndpointManager { poll: Poll::new().map_err(drop)?, events: Events::with_capacity(64), undrained_endpoints: Default::default(), delayed_messages: Default::default(), undelayed_messages: Default::default(), + endpoint_exts: Vec::with_capacity(endpoint_setups.len()), }; let mut todos = endpoint_setups .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { - init(Token(index), *local_port, endpoint_setup, &mut ep.poll) + init(Token(index), *local_port, endpoint_setup, &mut em.poll) }) .collect::, _>>()?; let mut unfinished: HashSet = (0..todos.len()).collect(); while !unfinished.is_empty() { let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; - ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?; - for event in ep.events.iter() { + em.poll.poll(&mut em.events, Some(remaining)).map_err(drop)?; + for event in em.events.iter() { let token = event.token(); let Token(index) = token; let todo: &mut Todo = &mut todos[index]; if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { let (mut stream, peer_addr) = listener.accept().map_err(drop)?; - ep.poll.registry().deregister(listener).unwrap(); - ep.poll.registry().register(&mut stream, token, BOTH).unwrap(); + em.poll.registry().deregister(listener).unwrap(); + em.poll.registry().register(&mut stream, token, BOTH).unwrap(); log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr); let endpoint = Endpoint { stream, inbox: vec![] }; todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); @@ -230,7 +233,7 @@ fn init_endpoints( *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - ep.undrained_endpoints.insert(index); + em.undrained_endpoints.insert(index); if let Some(peer_port_info) = endpoint.try_recv::().map_err(drop)? { @@ -250,26 +253,24 @@ fn init_endpoints( Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(), } } - ep.events.clear(); + em.events.clear(); } - let endpoint_exts = todos - .into_iter() - .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { + em.endpoint_exts.extend(todos.into_iter().map( + |Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { endpoint: match todo_endpoint { TodoEndpoint::Endpoint(endpoint) => endpoint, TodoEndpoint::Listener(..) => unreachable!(), }, inp_for_emerging_msgs: recv_peer_port.unwrap(), - }) - .collect(); - Ok((endpoint_exts, ep)) + }, + )); + Ok(em) } fn init_neighborhood( controller_id: ControllerId, logger: &mut dyn Logger, - endpoint_exts: &mut [EndpointExt], - ep: &mut EndpointPoller, + em: &mut EndpointManager, deadline: Instant, ) -> Result { log!(logger, "beginning neighborhood construction"); @@ -278,8 +279,8 @@ fn init_neighborhood( // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); - let mut awaiting = HashSet::with_capacity(endpoint_exts.len()); - for (index, ee) in endpoint_exts.iter_mut().enumerate() { + let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); + for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); ee.endpoint.send(&echo)?; awaiting.insert(index); @@ -289,9 +290,9 @@ fn init_neighborhood( // adopt it as leader, sender as parent, and reset the await set. let mut parent: Option = None; let mut my_leader = controller_id; - ep.undelay_all(); + em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -311,7 +312,7 @@ fn init_neighborhood( if awaiting.is_empty() { if let Some(p) = parent { // return the echo to my parent - endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?; + em.send_to(p, &S(LeaderEcho { maybe_leader }))?; } else { // DECIDE! break 'echo_loop; @@ -325,12 +326,12 @@ fn init_neighborhood( my_leader = maybe_leader; let echo = S(LeaderEcho { maybe_leader: my_leader }); awaiting.clear(); - if endpoint_exts.len() == 1 { + if em.endpoint_exts.len() == 1 { // immediately reply to parent log!(logger, "replying echo to parent {:?} immediately", index); - endpoint_exts[index].endpoint.send(&echo)?; + em.send_to(index, &echo)?; } else { - for (index2, ee) in endpoint_exts.iter_mut().enumerate() { + for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { if index2 == index { continue; } @@ -342,7 +343,7 @@ fn init_neighborhood( } } } - inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), } } match parent { @@ -364,7 +365,7 @@ fn init_neighborhood( // in this loop, every node sends 1 message to each neighbor // await 1 message from all non-parents let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for (index, ee) in endpoint_exts.iter_mut().enumerate() { + for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { let msg = if Some(index) == parent { &S(YouAreMyParent) } else { @@ -375,9 +376,9 @@ fn init_neighborhood( ee.endpoint.send(msg)?; } let mut children = Vec::default(); - ep.undelay_all(); + em.undelay_all(); while !awaiting.is_empty() { - let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index)); @@ -389,7 +390,7 @@ fn init_neighborhood( assert!(Some(index) != parent); // they wouldn't send me this if they considered me their parent } - inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), } } children.sort();