diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 6822e4b907b0cbdb2719e028e334fa5ceaef360b..6277bc5b3ce89fe2c354dbafe21d0d14c178c699 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -18,7 +18,25 @@ impl Connector { native_ports: Default::default(), port_info: Default::default(), }, - phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, + phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { + net_endpoint_setups: Default::default(), + udp_endpoint_setups: Default::default(), + surplus_sockets, + })), + } + } + pub fn new_udp_port( + &mut self, + local_addr: SocketAddr, + peer_addr: SocketAddr, + ) -> Result<[PortId; 2], WrongStateError> { + let Self { unphased: _up, phased } = self; + match phased { + ConnectorPhased::Communication(..) => Err(WrongStateError), + ConnectorPhased::Setup(_setup) => { + let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; + todo!() + } } } pub fn new_net_port( @@ -26,12 +44,12 @@ impl Connector { polarity: Polarity, sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, - ) -> Result { + ) -> Result { let Self { unphased: up, phased } = self; match phased { - ConnectorPhased::Communication { .. } => Err(NewNetPortError::AlreadyConnected), - ConnectorPhased::Setup { endpoint_setups, .. } => { - let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity }; + ConnectorPhased::Communication(..) => Err(WrongStateError), + ConnectorPhased::Setup(setup) => { + let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity }; let p = up.id_manager.new_port_id(); up.native_ports.insert(p); // {polarity, route} known. {peer} unknown. @@ -44,7 +62,7 @@ impl Connector { polarity, &endpoint_setup ); - endpoint_setups.push((p, endpoint_setup)); + setup.net_endpoint_setups.push((p, endpoint_setup)); Ok(p) } } @@ -52,25 +70,25 @@ impl Connector { pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError as Ce; let Self { unphased: cu, phased } = self; - match phased { + match &phased { ConnectorPhased::Communication { .. } => { log!(cu.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } - ConnectorPhased::Setup { endpoint_setups, .. } => { + ConnectorPhased::Setup(setup) => { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( &mut *cu.logger, - endpoint_setups, + &setup.net_endpoint_setups, &mut cu.port_info, deadline, )?; log!( cu.logger, "Successfully connected {} endpoints", - endpoint_manager.endpoint_exts.len() + endpoint_manager.net_endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( @@ -99,7 +117,7 @@ impl Connector { } fn new_endpoint_manager( logger: &mut dyn Logger, - endpoint_setups: &[(PortId, EndpointSetup)], + endpoint_setups: &[(PortId, NetEndpointSetup)], port_info: &mut PortInfo, deadline: Option, ) -> Result { @@ -109,26 +127,26 @@ fn new_endpoint_manager( const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, - endpoint_setup: EndpointSetup, + endpoint_setup: NetEndpointSetup, local_port: PortId, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } enum TodoEndpoint { Accepting(TcpListener), - Endpoint(Endpoint), + NetEndpoint(NetEndpoint), } fn init_todo( token: Token, local_port: PortId, - endpoint_setup: &EndpointSetup, + endpoint_setup: &NetEndpointSetup, poll: &mut Poll, ) -> Result { let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { let mut stream = TcpStream::connect(endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) + TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; @@ -146,12 +164,13 @@ fn new_endpoint_manager( //////////////////////////////////////////// // 1. Start to construct EndpointManager - const WAKER_TOKEN: Token = Token(usize::MAX); const WAKER_PERIOD: Duration = Duration::from_millis(300); + struct WakerState { + continue_signal: AtomicBool, + waker: mio::Waker, + } - assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token - - let mut waker_continue_signal: Option> = None; + let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); let mut polled_undrained = VecSet::default(); @@ -162,7 +181,12 @@ fn new_endpoint_manager( .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { - init_todo(Token(index), *local_port, endpoint_setup, &mut poll) + init_todo( + TokenTarget::NetEndpoint { index }.into(), + *local_port, + endpoint_setup, + &mut poll, + ) }) .collect::, ConnectError>>()?; @@ -184,192 +208,227 @@ fn new_endpoint_manager( poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; for event in events.iter() { let token = event.token(); - let Token(index) = token; - if token == WAKER_TOKEN { - log!( - logger, - "Notification from waker. connect_failed is {:?}", - connect_failed.iter() - ); - assert!(waker_continue_signal.is_some()); - for index in connect_failed.drain() { - let todo: &mut Todo = &mut todos[index]; + let token_target = TokenTarget::from(token); + match token_target { + TokenTarget::Waker => { log!( logger, - "Restarting connection with endpoint {:?} {:?}", - index, - todo.endpoint_setup.sock_addr + "Notification from waker. connect_failed is {:?}", + connect_failed.iter() ); - match &mut todo.todo_endpoint { - TodoEndpoint::Endpoint(endpoint) => { - let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - std::mem::swap(&mut endpoint.stream, &mut new_stream); - poll.registry() - .register(&mut endpoint.stream, Token(index), BOTH) - .unwrap(); + assert!(waker_state.is_some()); + for net_endpoint_index in connect_failed.drain() { + let todo: &mut Todo = &mut todos[net_endpoint_index]; + log!( + logger, + "Restarting connection with endpoint {:?} {:?}", + net_endpoint_index, + todo.endpoint_setup.sock_addr + ); + match &mut todo.todo_endpoint { + TodoEndpoint::NetEndpoint(endpoint) => { + let mut new_stream = + TcpStream::connect(todo.endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); + std::mem::swap(&mut endpoint.stream, &mut new_stream); + let token = + TokenTarget::NetEndpoint { index: net_endpoint_index }.into(); + poll.registry() + .register(&mut endpoint.stream, token, BOTH) + .unwrap(); + } + _ => unreachable!(), } - _ => unreachable!(), } } - } else { - let todo: &mut Todo = &mut todos[index]; - // FIRST try convert this into an endpoint - if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { - match listener.accept() { - Ok((mut stream, peer_addr)) => { - poll.registry().deregister(listener).unwrap(); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!( - logger, - "Endpoint[{}] accepted a connection from {:?}", - index, - peer_addr - ); - let endpoint = Endpoint { stream, inbox: vec![] }; - todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); - } - Err(e) if would_block(&e) => { - log!(logger, "Spurious wakeup on listener {:?}", index) - } - Err(_) => { - log!(logger, "accept() failure on index {}", index); - return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); + TokenTarget::UdpEndpoint { index: _ } => unreachable!(), + TokenTarget::NetEndpoint { index } => { + let todo: &mut Todo = &mut todos[index]; + if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { + // FIRST try complete this connection + match listener.accept() { + Err(e) if would_block(&e) => { + log!(logger, "Spurious wakeup on listener {:?}", index) + } + Err(_) => { + log!(logger, "accept() failure on index {}", index); + return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); + } + Ok((mut stream, peer_addr)) => { + // success! + poll.registry().deregister(listener).unwrap(); + // reusing original token as-is + poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!( + logger, + "Endpoint[{}] accepted a connection from {:?}", + index, + peer_addr + ); + let net_endpoint = NetEndpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); + } } } - } - if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint { - if event.is_error() { - if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { - // right now you cannot retry an acceptor. - return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap())); + if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut todo.todo_endpoint { + if event.is_error() { + if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { + // right now you cannot retry an acceptor. return failure + return Err(Ce::AcceptFailed( + net_endpoint.stream.local_addr().unwrap(), + )); + } + // this actively-connecting endpoint failed to connect! + if connect_failed.insert(index) { + log!( + logger, + "Connection failed for {:?}. List is {:?}", + index, + connect_failed.iter() + ); + poll.registry().deregister(&mut net_endpoint.stream).unwrap(); + } else { + // spurious wakeup. + continue; + } + if waker_state.is_none() { + log!(logger, "First connect failure. Starting waker thread"); + let arc = Arc::new(WakerState { + waker: mio::Waker::new( + poll.registry(), + TokenTarget::Waker.into(), + ) + .unwrap(), + continue_signal: true.into(), + }); + let moved_arc = arc.clone(); + waker_state = Some(arc); + std::thread::spawn(move || { + while moved_arc + .continue_signal + .load(std::sync::atomic::Ordering::SeqCst) + { + std::thread::sleep(WAKER_PERIOD); + let _ = moved_arc.waker.wake(); + } + }); + } + continue; } - if connect_failed.insert(index) { - log!( - logger, - "Connection failed for {:?}. List is {:?}", - index, - connect_failed.iter() - ); - poll.registry().deregister(&mut endpoint.stream).unwrap(); - } else { + // event wasn't ERROR + if connect_failed.contains(&index) { // spurious wakeup continue; } - - if waker_continue_signal.is_none() { - log!(logger, "First connect failure. Starting waker thread"); - let waker = - Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap()); - let wcs = Arc::new(AtomicBool::from(true)); - let wcs2 = wcs.clone(); - std::thread::spawn(move || { - while wcs2.load(std::sync::atomic::Ordering::SeqCst) { - std::thread::sleep(WAKER_PERIOD); - let _ = waker.wake(); - } - }); - waker_continue_signal = Some(wcs); + if !setup_incomplete.contains(&index) { + // spurious wakeup + continue; } - continue; - } - if connect_failed.contains(&index) { - // spurious wakeup - continue; - } - if !setup_incomplete.contains(&index) { - // spurious wakeup - continue; - } - let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); - if event.is_writable() && !todo.sent_local_port { - let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { - polarity: local_polarity, - port: todo.local_port, - })); - endpoint - .send(&msg) - .map_err(|e| { - Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) - }) - .unwrap(); - log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); - todo.sent_local_port = true; - } - if event.is_readable() && todo.recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv(logger).map_err(|e| { - Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) - })?; - if maybe_msg.is_some() && !endpoint.inbox.is_empty() { - polled_undrained.insert(index); + let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); + if event.is_writable() && !todo.sent_local_port { + // can write and didn't send setup msg yet? Do so! + let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { + polarity: local_polarity, + port: todo.local_port, + })); + net_endpoint + .send(&msg) + .map_err(|e| { + Ce::EndpointSetupError( + net_endpoint.stream.local_addr().unwrap(), + e, + ) + }) + .unwrap(); + log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); + todo.sent_local_port = true; } - match maybe_msg { - None => {} // msg deserialization incomplete - Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { - log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); - if peer_info.polarity == local_polarity { - return Err(ConnectError::PortPeerPolarityMismatch( - todo.local_port, - )); + if event.is_readable() && todo.recv_peer_port.is_none() { + // can read and didn't recv setup msg yet? Do so! + let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { + Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e) + })?; + if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() { + polled_undrained.insert(index); + } + match maybe_msg { + None => {} // msg deserialization incomplete + Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { + log!( + logger, + "endpoint[{}] got peer info {:?}", + index, + peer_info + ); + if peer_info.polarity == local_polarity { + return Err(ConnectError::PortPeerPolarityMismatch( + todo.local_port, + )); + } + todo.recv_peer_port = Some(peer_info.port); + // 1. finally learned the peer of this port! + port_info.peers.insert(todo.local_port, peer_info.port); + // 2. learned the info of this peer port + port_info.polarities.insert(peer_info.port, peer_info.polarity); + port_info.peers.insert(peer_info.port, todo.local_port); + if let Some(route) = port_info.routes.get(&peer_info.port) { + // check just for logging purposes + log!( + logger, + "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", + peer_info.port, + route + ); + } + port_info + .routes + .entry(peer_info.port) + .or_insert(Route::Endpoint { index }); } - todo.recv_peer_port = Some(peer_info.port); - // 1. finally learned the peer of this port! - port_info.peers.insert(todo.local_port, peer_info.port); - // 2. learned the info of this peer port - port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert(peer_info.port, todo.local_port); - if let Some(route) = port_info.routes.get(&peer_info.port) { - // check just for logging purposes + Some(inappropriate_msg) => { log!( logger, - "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", - peer_info.port, - route + "delaying msg {:?} during channel setup phase", + inappropriate_msg ); + delayed_messages.push((index, inappropriate_msg)); } - port_info - .routes - .entry(peer_info.port) - .or_insert(Route::Endpoint { index }); - } - Some(inappropriate_msg) => { - log!( - logger, - "delaying msg {:?} during channel setup phase", - inappropriate_msg - ); - delayed_messages.push((index, inappropriate_msg)); } } - } - if todo.sent_local_port && todo.recv_peer_port.is_some() { - setup_incomplete.remove(&index); - log!(logger, "endpoint[{}] is finished!", index); + // is the setup for this net_endpoint now complete? + if todo.sent_local_port && todo.recv_peer_port.is_some() { + // yes! connected, sent my info and received peer's info + setup_incomplete.remove(&index); + log!(logger, "endpoint[{}] is finished!", index); + } } } } } events.clear(); } - let endpoint_exts = todos + // all todos must be the NetEndpoint variants! unwrap and collect them + let net_endpoint_exts = todos .into_iter() .enumerate() - .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt { - endpoint: match todo_endpoint { - TodoEndpoint::Endpoint(mut endpoint) => { + .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt { + net_endpoint: match todo_endpoint { + TodoEndpoint::NetEndpoint(mut net_endpoint) => { + let token = TokenTarget::NetEndpoint { index }.into(); poll.registry() - .reregister(&mut endpoint.stream, Token(index), Interest::READABLE) + .reregister(&mut net_endpoint.stream, token, Interest::READABLE) .unwrap(); - endpoint + net_endpoint } _ => unreachable!(), }, getter_for_incoming: local_port, }) .collect(); - if let Some(wcs) = waker_continue_signal { + if let Some(arc) = waker_state { log!(logger, "Sending waker the stop signal"); - wcs.store(false, std::sync::atomic::Ordering::SeqCst); + arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); + // TODO leave the waker registered? } Ok(EndpointManager { poll, @@ -377,7 +436,7 @@ fn new_endpoint_manager( polled_undrained, undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - endpoint_exts, + net_endpoint_exts, }) } @@ -427,12 +486,12 @@ fn init_neighborhood( NOTE the distinction between PARENT and LEADER */ log!(logger, "beginning neighborhood construction"); - if em.num_endpoints() == 0 { + if em.num_net_endpoints() == 0 { log!(logger, "Edge case of no neighbors! No parent an no children!"); return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) }); } - log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints()); - let mut awaiting = HashSet::with_capacity(em.num_endpoints()); + log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints()); + let mut awaiting = HashSet::with_capacity(em.num_net_endpoints()); // 1+ neighbors. Leader can only be learned by receiving messages // loop ends when I know my sink tree parent (implies leader was elected) let election_result: WaveState = { @@ -650,7 +709,7 @@ fn session_optimize( serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm .endpoint_manager - .endpoint_exts + .net_endpoint_exts .iter() .map(|ee| ee.getter_for_incoming) .collect(), @@ -740,7 +799,7 @@ fn apply_optimizations( cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in - comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) { ee.getter_for_incoming = getter; }