From 48b0400c877fbbf7ab22759679e3f9d38b4f564e 2020-06-20 15:29:23 From: Christopher Esterhuyse Date: 2020-06-20 15:29:23 Subject: [PATCH] clearer input output wiring --- diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index 6146f54d2d4f57e88d4692034a6cec1d8850ca5f..bc747db7960b1b734461fd8007b59628e2468f98 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -1,6 +1,13 @@ use crate::common::*; use crate::runtime::*; +struct LogicalChannelInfo { + local_port: PortId, + peer_port: PortId, + local_polarity: Polarity, + endpoint_index: usize, +} +/////////////// impl Connector { pub fn new_simple( proto_description: Arc, @@ -42,6 +49,9 @@ impl Connector { ConnectorPhased::Setup { endpoint_setups, .. } => { let p = self.id_manager.next_port(); self.native_ports.insert(p); + if endpoint_setup.polarity == Getter { + self.inp_to_route.insert(p, InpRoute::NativeComponent); + } log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup); endpoint_setups.push((p, endpoint_setup)); Ok(p) @@ -107,12 +117,24 @@ impl Connector { log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = init_endpoints( - &mut *self.logger, - endpoint_setups, - &mut self.inp_to_route, - deadline, - )?; + let mut endpoint_manager = { + let Self { outp_to_inp, inp_to_route, logger, .. } = self; + let logical_channel_callback = |lci: LogicalChannelInfo| { + if let Putter = lci.local_polarity { + outp_to_inp.insert(lci.local_port, lci.peer_port); + inp_to_route.insert( + lci.peer_port, + InpRoute::Endpoint { index: lci.endpoint_index }, + ); + } + }; + new_endpoint_manager( + &mut **logger, + endpoint_setups, + logical_channel_callback, + deadline, + )? + }; log!( self.logger, "Successfully connected {} endpoints", @@ -138,12 +160,13 @@ impl Connector { } } -fn init_endpoints( +fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, EndpointSetup)], - inp_to_route: &mut HashMap, + mut logical_channel_callback: impl FnMut(LogicalChannelInfo), deadline: Instant, ) -> Result { + //////////////////////////////////////////// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -156,7 +179,7 @@ fn init_endpoints( Listener(TcpListener), Endpoint(Endpoint), } - fn init( + fn init_todo( token: Token, local_port: PortId, endpoint_setup: &EndpointSetup, @@ -179,37 +202,38 @@ fn init_endpoints( recv_peer_port: None, }) }; - //////////////////////// + //////////////////////////////////////////// - let mut em = EndpointManager { - poll: Poll::new().map_err(drop)?, - events: Events::with_capacity(64), - undrained_endpoints: Default::default(), - delayed_messages: Default::default(), - undelayed_messages: Default::default(), - endpoint_exts: Vec::with_capacity(endpoint_setups.len()), - }; + // 1. Start to construct EndpointManager + let mut poll = Poll::new().map_err(drop)?; + let mut events = Events::with_capacity(64); + let mut undrained_endpoints = IndexSet::::default(); + // 2. create a registered (TcpListener/Endpoint) for passive / active respectively let mut todos = endpoint_setups .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { - init(Token(index), *local_port, endpoint_setup, &mut em.poll) + init_todo(Token(index), *local_port, endpoint_setup, &mut poll) }) .collect::, _>>()?; - let mut unfinished: HashSet = (0..todos.len()).collect(); - while !unfinished.is_empty() { + // 3. Using poll to drive progress: + // - accept an incoming connection for each TcpListener (turning them into endpoints too) + // - for each endpoint, send the local PortId + // - for each endpoint, recv the peer's PortId, and + let mut setup_incomplete: HashSet = (0..todos.len()).collect(); + while !setup_incomplete.is_empty() { let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; - em.poll.poll(&mut em.events, Some(remaining)).map_err(drop)?; - for event in em.events.iter() { + poll.poll(&mut events, Some(remaining)).map_err(drop)?; + for event in events.iter() { let token = event.token(); let Token(index) = token; let todo: &mut Todo = &mut todos[index]; if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { let (mut stream, peer_addr) = listener.accept().map_err(drop)?; - em.poll.registry().deregister(listener).unwrap(); - em.poll.registry().register(&mut stream, token, BOTH).unwrap(); + poll.registry().deregister(listener).unwrap(); + poll.registry().register(&mut stream, token, BOTH).unwrap(); log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr); let endpoint = Endpoint { stream, inbox: vec![] }; todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); @@ -222,7 +246,7 @@ fn init_endpoints( sent_local_port, recv_peer_port, } => { - if !unfinished.contains(&index) { + if !setup_incomplete.contains(&index) { continue; } if event.is_writable() && !*sent_local_port { @@ -233,38 +257,50 @@ fn init_endpoints( *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - em.undrained_endpoints.insert(index); + undrained_endpoints.insert(index); if let Some(peer_port_info) = endpoint.try_recv::().map_err(drop)? { log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info); assert!(peer_port_info.polarity != endpoint_setup.polarity); - if let Putter = endpoint_setup.polarity { - inp_to_route.insert(*local_port, InpRoute::Endpoint { index }); - } *recv_peer_port = Some(peer_port_info.port); + let lci = LogicalChannelInfo { + local_port: *local_port, + peer_port: peer_port_info.port, + local_polarity: endpoint_setup.polarity, + endpoint_index: index, + }; + logical_channel_callback(lci); } } if *sent_local_port && recv_peer_port.is_some() { - unfinished.remove(&index); + setup_incomplete.remove(&index); log!(logger, "endpoint[{}] is finished!", index); } } Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(), } } - em.events.clear(); + events.clear(); } - em.endpoint_exts.extend(todos.into_iter().map( - |Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { + let endpoint_exts = todos + .into_iter() + .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { endpoint: match todo_endpoint { TodoEndpoint::Endpoint(endpoint) => endpoint, TodoEndpoint::Listener(..) => unreachable!(), }, inp_for_emerging_msgs: recv_peer_port.unwrap(), - }, - )); - Ok(em) + }) + .collect(); + Ok(EndpointManager { + poll, + events, + undrained_endpoints, + delayed_messages: Default::default(), + undelayed_messages: Default::default(), + endpoint_exts, + }) } fn init_neighborhood( @@ -273,10 +309,12 @@ fn init_neighborhood( em: &mut EndpointManager, deadline: Instant, ) -> Result { - log!(logger, "beginning neighborhood construction"); + //////////////////////////////////////////// use Msg::SetupMsg as S; use SetupMsg::*; + //////////////////////////////////////////// + log!(logger, "beginning neighborhood construction"); // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); @@ -358,12 +396,11 @@ fn init_neighborhood( parent, controller_id ), } - log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); // 3. broadcast leader announcement (except to parent: confirm they are your parent) // in this loop, every node sends 1 message to each neighbor - // await 1 message from all non-parents + // await 1 message from all non-parents. let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { let msg = if Some(index) == parent {