diff --git a/Cargo.toml b/Cargo.toml index a57be74c90133a042ec03459b3942a8d23c2227c..e88eb3ff5a529bd8b4454f1424f3529fdb99c0fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ getrandom = "0.1.14" # tiny crate. used to guess controller-id # network # integer-encoding = "1.1.5" # byteorder = "1.3.4" -mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] } +mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] } # protocol backtrace = "0.3" diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 5b8ce5558b6dfef3374b4b715c6f36bc60e8befd..013986dd2a6ce619d5902ec3e0553b56837905ed 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -74,12 +74,11 @@ impl Connector { }, } } - pub fn next_batch(&mut self) -> Result { + pub fn next_batch(&mut self) -> Result { // returns index of new batch - use NextBatchError as Nbe; let Self { phased, .. } = self; match phased { - ConnectorPhased::Setup { .. } => Err(Nbe::NotConnected), + ConnectorPhased::Setup { .. } => Err(WrongStateError), ConnectorPhased::Communication(comm) => { comm.native_batches.push(Default::default()); Ok(comm.native_batches.len() - 1) @@ -568,8 +567,8 @@ impl Connector { }; match comm_msg_contents { CommMsgContents::SendPayload(send_payload_msg) => { - let getter = - comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming; + let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index] + .getter_for_incoming; assert!(cu.port_info.polarities.get(&getter) == Some(&Getter)); log!( cu.logger, diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 0fd9398b3bc6f8fa76658af6e9a49fc1f9b450d2..a105ff1777b3e656ab79ee71268f1f204ff707e0 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -11,7 +11,7 @@ enum TryRecyAnyError { EndpointError { error: EndpointError, index: usize }, } ///////////////////// -impl Endpoint { +impl NetEndpoint { fn bincode_opts() -> impl bincode::config::Options { bincode::config::DefaultOptions::default() } @@ -70,10 +70,10 @@ impl Endpoint { impl EndpointManager { pub(super) fn index_iter(&self) -> Range { - 0..self.num_endpoints() + 0..self.num_net_endpoints() } - pub(super) fn num_endpoints(&self) -> usize { - self.endpoint_exts.len() + pub(super) fn num_net_endpoints(&self) -> usize { + self.net_endpoint_exts.len() } pub(super) fn send_to_comms( &mut self, @@ -81,13 +81,13 @@ impl EndpointManager { msg: &Msg, ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; - let endpoint = &mut self.endpoint_exts[index].endpoint; - endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index)) + let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index)) } pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let endpoint = &mut self.endpoint_exts[index].endpoint; - endpoint.send(msg).map_err(|err| { - ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err) + let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|err| { + ConnectError::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } pub(super) fn try_recv_any_comms( @@ -113,7 +113,7 @@ impl EndpointManager { Trae::Timeout => Ce::Timeout, Trae::PollFailed => Ce::PollFailed, Trae::EndpointError { error, index } => Ce::EndpointSetupError( - self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(), + self.net_endpoint_exts[index].net_endpoint.stream.local_addr().unwrap(), error, ), }) @@ -132,13 +132,13 @@ impl EndpointManager { loop { // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { - let endpoint = &mut self.endpoint_exts[index].endpoint; - if let Some(msg) = endpoint + let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; + if let Some(msg) = net_endpoint .try_recv(logger) .map_err(|error| Trea::EndpointError { error, index })? { endptlog!(logger, "RECV polled_undrained {:?}", &msg); - if !endpoint.inbox.is_empty() { + if !net_endpoint.inbox.is_empty() { // there may be another message waiting! self.polled_undrained.insert(index); } @@ -176,7 +176,7 @@ impl EndpointManager { self.undelayed_messages.extend(self.delayed_messages.drain(..)); } } -impl Debug for Endpoint { +impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } diff --git a/src/runtime/error.rs b/src/runtime/error.rs index c226b4368b276d20e7fa161f8300b96a7db46a35..e51e4020b14139b3b0c154b175767b928cb75190 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -61,14 +61,7 @@ pub enum GottenError { PreviousSyncFailed, } #[derive(Debug, Eq, PartialEq)] -pub enum NextBatchError { - NotConnected, -} - -#[derive(Debug, Eq, PartialEq)] -pub enum NewNetPortError { - AlreadyConnected, -} +pub struct WrongStateError; ///////////////////// impl From for SyncError { fn from(e: UnrecoverableSyncError) -> Self { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 12cc34285eb35ebb65f10d2336aec215e407aab5..87b4702dfc5aa477113523e94ecac1bfc55dc765 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -16,6 +16,7 @@ mod tests; use crate::common::*; use error::*; +use mio::net::UdpSocket; #[derive(Debug)] pub struct Connector { @@ -132,7 +133,7 @@ enum AllMapperResult { New(Predicate), Nonexistant, } -struct Endpoint { +struct NetEndpoint { inbox: Vec, stream: TcpStream, } @@ -142,13 +143,19 @@ struct ProtoComponent { ports: HashSet, } #[derive(Debug, Clone)] -struct EndpointSetup { +struct NetEndpointSetup { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, } + +#[derive(Debug, Clone)] +struct UdpEndpointSetup { + local_addr: SocketAddr, + peer_addr: SocketAddr, +} #[derive(Debug)] -struct EndpointExt { - endpoint: Endpoint, +struct NetEndpointExt { + net_endpoint: NetEndpoint, getter_for_incoming: PortId, } #[derive(Debug)] @@ -177,7 +184,13 @@ struct EndpointManager { polled_undrained: VecSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, - endpoint_exts: Vec, + net_endpoint_exts: Vec, +} +struct UdpEndpointExt { + sock: UdpSocket, + getter_for_incoming: PortId, + outgoing_buffer: HashMap, + incoming_buffer: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { @@ -203,8 +216,14 @@ struct ConnectorUnphased { port_info: PortInfo, } #[derive(Debug)] +struct ConnectorSetup { + net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>, + udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>, + surplus_sockets: u16, +} +#[derive(Debug)] enum ConnectorPhased { - Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 }, + Setup(Box), Communication(Box), } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] @@ -217,10 +236,40 @@ struct NativeBatch { to_put: HashMap, to_get: HashSet, } +enum TokenTarget { + NetEndpoint { index: usize }, + UdpEndpoint { index: usize }, + Waker, +} //////////////// -pub fn would_block(err: &std::io::Error) -> bool { +fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } +impl TokenTarget { + const HALFWAY_INDEX: usize = usize::MAX / 2; + const MAX_INDEX: usize = usize::MAX; + const WAKER_TOKEN: usize = Self::MAX_INDEX; +} +impl From for TokenTarget { + fn from(Token(index): Token) -> Self { + if index == Self::MAX_INDEX { + TokenTarget::Waker + } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + TokenTarget::UdpEndpoint { index: shifted } + } else { + TokenTarget::NetEndpoint { index } + } + } +} +impl Into for TokenTarget { + fn into(self) -> Token { + match self { + TokenTarget::Waker => Token(Self::MAX_INDEX), + TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), + TokenTarget::NetEndpoint { index } => Token(index), + } + } +} impl VecSet { fn new(mut vec: Vec) -> Self { vec.sort(); diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 6822e4b907b0cbdb2719e028e334fa5ceaef360b..6277bc5b3ce89fe2c354dbafe21d0d14c178c699 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -18,7 +18,25 @@ impl Connector { native_ports: Default::default(), port_info: Default::default(), }, - phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, + phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { + net_endpoint_setups: Default::default(), + udp_endpoint_setups: Default::default(), + surplus_sockets, + })), + } + } + pub fn new_udp_port( + &mut self, + local_addr: SocketAddr, + peer_addr: SocketAddr, + ) -> Result<[PortId; 2], WrongStateError> { + let Self { unphased: _up, phased } = self; + match phased { + ConnectorPhased::Communication(..) => Err(WrongStateError), + ConnectorPhased::Setup(_setup) => { + let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; + todo!() + } } } pub fn new_net_port( @@ -26,12 +44,12 @@ impl Connector { polarity: Polarity, sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, - ) -> Result { + ) -> Result { let Self { unphased: up, phased } = self; match phased { - ConnectorPhased::Communication { .. } => Err(NewNetPortError::AlreadyConnected), - ConnectorPhased::Setup { endpoint_setups, .. } => { - let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity }; + ConnectorPhased::Communication(..) => Err(WrongStateError), + ConnectorPhased::Setup(setup) => { + let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity }; let p = up.id_manager.new_port_id(); up.native_ports.insert(p); // {polarity, route} known. {peer} unknown. @@ -44,7 +62,7 @@ impl Connector { polarity, &endpoint_setup ); - endpoint_setups.push((p, endpoint_setup)); + setup.net_endpoint_setups.push((p, endpoint_setup)); Ok(p) } } @@ -52,25 +70,25 @@ impl Connector { pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { use ConnectError as Ce; let Self { unphased: cu, phased } = self; - match phased { + match &phased { ConnectorPhased::Communication { .. } => { log!(cu.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } - ConnectorPhased::Setup { endpoint_setups, .. } => { + ConnectorPhased::Setup(setup) => { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( &mut *cu.logger, - endpoint_setups, + &setup.net_endpoint_setups, &mut cu.port_info, deadline, )?; log!( cu.logger, "Successfully connected {} endpoints", - endpoint_manager.endpoint_exts.len() + endpoint_manager.net_endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( @@ -99,7 +117,7 @@ impl Connector { } fn new_endpoint_manager( logger: &mut dyn Logger, - endpoint_setups: &[(PortId, EndpointSetup)], + endpoint_setups: &[(PortId, NetEndpointSetup)], port_info: &mut PortInfo, deadline: Option, ) -> Result { @@ -109,26 +127,26 @@ fn new_endpoint_manager( const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, - endpoint_setup: EndpointSetup, + endpoint_setup: NetEndpointSetup, local_port: PortId, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } enum TodoEndpoint { Accepting(TcpListener), - Endpoint(Endpoint), + NetEndpoint(NetEndpoint), } fn init_todo( token: Token, local_port: PortId, - endpoint_setup: &EndpointSetup, + endpoint_setup: &NetEndpointSetup, poll: &mut Poll, ) -> Result { let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { let mut stream = TcpStream::connect(endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) + TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; @@ -146,12 +164,13 @@ fn new_endpoint_manager( //////////////////////////////////////////// // 1. Start to construct EndpointManager - const WAKER_TOKEN: Token = Token(usize::MAX); const WAKER_PERIOD: Duration = Duration::from_millis(300); + struct WakerState { + continue_signal: AtomicBool, + waker: mio::Waker, + } - assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token - - let mut waker_continue_signal: Option> = None; + let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); let mut polled_undrained = VecSet::default(); @@ -162,7 +181,12 @@ fn new_endpoint_manager( .iter() .enumerate() .map(|(index, (local_port, endpoint_setup))| { - init_todo(Token(index), *local_port, endpoint_setup, &mut poll) + init_todo( + TokenTarget::NetEndpoint { index }.into(), + *local_port, + endpoint_setup, + &mut poll, + ) }) .collect::, ConnectError>>()?; @@ -184,192 +208,227 @@ fn new_endpoint_manager( poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; for event in events.iter() { let token = event.token(); - let Token(index) = token; - if token == WAKER_TOKEN { - log!( - logger, - "Notification from waker. connect_failed is {:?}", - connect_failed.iter() - ); - assert!(waker_continue_signal.is_some()); - for index in connect_failed.drain() { - let todo: &mut Todo = &mut todos[index]; + let token_target = TokenTarget::from(token); + match token_target { + TokenTarget::Waker => { log!( logger, - "Restarting connection with endpoint {:?} {:?}", - index, - todo.endpoint_setup.sock_addr + "Notification from waker. connect_failed is {:?}", + connect_failed.iter() ); - match &mut todo.todo_endpoint { - TodoEndpoint::Endpoint(endpoint) => { - let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - std::mem::swap(&mut endpoint.stream, &mut new_stream); - poll.registry() - .register(&mut endpoint.stream, Token(index), BOTH) - .unwrap(); + assert!(waker_state.is_some()); + for net_endpoint_index in connect_failed.drain() { + let todo: &mut Todo = &mut todos[net_endpoint_index]; + log!( + logger, + "Restarting connection with endpoint {:?} {:?}", + net_endpoint_index, + todo.endpoint_setup.sock_addr + ); + match &mut todo.todo_endpoint { + TodoEndpoint::NetEndpoint(endpoint) => { + let mut new_stream = + TcpStream::connect(todo.endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); + std::mem::swap(&mut endpoint.stream, &mut new_stream); + let token = + TokenTarget::NetEndpoint { index: net_endpoint_index }.into(); + poll.registry() + .register(&mut endpoint.stream, token, BOTH) + .unwrap(); + } + _ => unreachable!(), } - _ => unreachable!(), } } - } else { - let todo: &mut Todo = &mut todos[index]; - // FIRST try convert this into an endpoint - if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { - match listener.accept() { - Ok((mut stream, peer_addr)) => { - poll.registry().deregister(listener).unwrap(); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!( - logger, - "Endpoint[{}] accepted a connection from {:?}", - index, - peer_addr - ); - let endpoint = Endpoint { stream, inbox: vec![] }; - todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); - } - Err(e) if would_block(&e) => { - log!(logger, "Spurious wakeup on listener {:?}", index) - } - Err(_) => { - log!(logger, "accept() failure on index {}", index); - return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); + TokenTarget::UdpEndpoint { index: _ } => unreachable!(), + TokenTarget::NetEndpoint { index } => { + let todo: &mut Todo = &mut todos[index]; + if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint { + // FIRST try complete this connection + match listener.accept() { + Err(e) if would_block(&e) => { + log!(logger, "Spurious wakeup on listener {:?}", index) + } + Err(_) => { + log!(logger, "accept() failure on index {}", index); + return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); + } + Ok((mut stream, peer_addr)) => { + // success! + poll.registry().deregister(listener).unwrap(); + // reusing original token as-is + poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!( + logger, + "Endpoint[{}] accepted a connection from {:?}", + index, + peer_addr + ); + let net_endpoint = NetEndpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); + } } } - } - if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint { - if event.is_error() { - if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { - // right now you cannot retry an acceptor. - return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap())); + if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut todo.todo_endpoint { + if event.is_error() { + if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { + // right now you cannot retry an acceptor. return failure + return Err(Ce::AcceptFailed( + net_endpoint.stream.local_addr().unwrap(), + )); + } + // this actively-connecting endpoint failed to connect! + if connect_failed.insert(index) { + log!( + logger, + "Connection failed for {:?}. List is {:?}", + index, + connect_failed.iter() + ); + poll.registry().deregister(&mut net_endpoint.stream).unwrap(); + } else { + // spurious wakeup. + continue; + } + if waker_state.is_none() { + log!(logger, "First connect failure. Starting waker thread"); + let arc = Arc::new(WakerState { + waker: mio::Waker::new( + poll.registry(), + TokenTarget::Waker.into(), + ) + .unwrap(), + continue_signal: true.into(), + }); + let moved_arc = arc.clone(); + waker_state = Some(arc); + std::thread::spawn(move || { + while moved_arc + .continue_signal + .load(std::sync::atomic::Ordering::SeqCst) + { + std::thread::sleep(WAKER_PERIOD); + let _ = moved_arc.waker.wake(); + } + }); + } + continue; } - if connect_failed.insert(index) { - log!( - logger, - "Connection failed for {:?}. List is {:?}", - index, - connect_failed.iter() - ); - poll.registry().deregister(&mut endpoint.stream).unwrap(); - } else { + // event wasn't ERROR + if connect_failed.contains(&index) { // spurious wakeup continue; } - - if waker_continue_signal.is_none() { - log!(logger, "First connect failure. Starting waker thread"); - let waker = - Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap()); - let wcs = Arc::new(AtomicBool::from(true)); - let wcs2 = wcs.clone(); - std::thread::spawn(move || { - while wcs2.load(std::sync::atomic::Ordering::SeqCst) { - std::thread::sleep(WAKER_PERIOD); - let _ = waker.wake(); - } - }); - waker_continue_signal = Some(wcs); + if !setup_incomplete.contains(&index) { + // spurious wakeup + continue; } - continue; - } - if connect_failed.contains(&index) { - // spurious wakeup - continue; - } - if !setup_incomplete.contains(&index) { - // spurious wakeup - continue; - } - let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); - if event.is_writable() && !todo.sent_local_port { - let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { - polarity: local_polarity, - port: todo.local_port, - })); - endpoint - .send(&msg) - .map_err(|e| { - Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) - }) - .unwrap(); - log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); - todo.sent_local_port = true; - } - if event.is_readable() && todo.recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv(logger).map_err(|e| { - Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) - })?; - if maybe_msg.is_some() && !endpoint.inbox.is_empty() { - polled_undrained.insert(index); + let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap(); + if event.is_writable() && !todo.sent_local_port { + // can write and didn't send setup msg yet? Do so! + let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { + polarity: local_polarity, + port: todo.local_port, + })); + net_endpoint + .send(&msg) + .map_err(|e| { + Ce::EndpointSetupError( + net_endpoint.stream.local_addr().unwrap(), + e, + ) + }) + .unwrap(); + log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); + todo.sent_local_port = true; } - match maybe_msg { - None => {} // msg deserialization incomplete - Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { - log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); - if peer_info.polarity == local_polarity { - return Err(ConnectError::PortPeerPolarityMismatch( - todo.local_port, - )); + if event.is_readable() && todo.recv_peer_port.is_none() { + // can read and didn't recv setup msg yet? Do so! + let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { + Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e) + })?; + if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() { + polled_undrained.insert(index); + } + match maybe_msg { + None => {} // msg deserialization incomplete + Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { + log!( + logger, + "endpoint[{}] got peer info {:?}", + index, + peer_info + ); + if peer_info.polarity == local_polarity { + return Err(ConnectError::PortPeerPolarityMismatch( + todo.local_port, + )); + } + todo.recv_peer_port = Some(peer_info.port); + // 1. finally learned the peer of this port! + port_info.peers.insert(todo.local_port, peer_info.port); + // 2. learned the info of this peer port + port_info.polarities.insert(peer_info.port, peer_info.polarity); + port_info.peers.insert(peer_info.port, todo.local_port); + if let Some(route) = port_info.routes.get(&peer_info.port) { + // check just for logging purposes + log!( + logger, + "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", + peer_info.port, + route + ); + } + port_info + .routes + .entry(peer_info.port) + .or_insert(Route::Endpoint { index }); } - todo.recv_peer_port = Some(peer_info.port); - // 1. finally learned the peer of this port! - port_info.peers.insert(todo.local_port, peer_info.port); - // 2. learned the info of this peer port - port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert(peer_info.port, todo.local_port); - if let Some(route) = port_info.routes.get(&peer_info.port) { - // check just for logging purposes + Some(inappropriate_msg) => { log!( logger, - "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", - peer_info.port, - route + "delaying msg {:?} during channel setup phase", + inappropriate_msg ); + delayed_messages.push((index, inappropriate_msg)); } - port_info - .routes - .entry(peer_info.port) - .or_insert(Route::Endpoint { index }); - } - Some(inappropriate_msg) => { - log!( - logger, - "delaying msg {:?} during channel setup phase", - inappropriate_msg - ); - delayed_messages.push((index, inappropriate_msg)); } } - } - if todo.sent_local_port && todo.recv_peer_port.is_some() { - setup_incomplete.remove(&index); - log!(logger, "endpoint[{}] is finished!", index); + // is the setup for this net_endpoint now complete? + if todo.sent_local_port && todo.recv_peer_port.is_some() { + // yes! connected, sent my info and received peer's info + setup_incomplete.remove(&index); + log!(logger, "endpoint[{}] is finished!", index); + } } } } } events.clear(); } - let endpoint_exts = todos + // all todos must be the NetEndpoint variants! unwrap and collect them + let net_endpoint_exts = todos .into_iter() .enumerate() - .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt { - endpoint: match todo_endpoint { - TodoEndpoint::Endpoint(mut endpoint) => { + .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt { + net_endpoint: match todo_endpoint { + TodoEndpoint::NetEndpoint(mut net_endpoint) => { + let token = TokenTarget::NetEndpoint { index }.into(); poll.registry() - .reregister(&mut endpoint.stream, Token(index), Interest::READABLE) + .reregister(&mut net_endpoint.stream, token, Interest::READABLE) .unwrap(); - endpoint + net_endpoint } _ => unreachable!(), }, getter_for_incoming: local_port, }) .collect(); - if let Some(wcs) = waker_continue_signal { + if let Some(arc) = waker_state { log!(logger, "Sending waker the stop signal"); - wcs.store(false, std::sync::atomic::Ordering::SeqCst); + arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); + // TODO leave the waker registered? } Ok(EndpointManager { poll, @@ -377,7 +436,7 @@ fn new_endpoint_manager( polled_undrained, undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - endpoint_exts, + net_endpoint_exts, }) } @@ -427,12 +486,12 @@ fn init_neighborhood( NOTE the distinction between PARENT and LEADER */ log!(logger, "beginning neighborhood construction"); - if em.num_endpoints() == 0 { + if em.num_net_endpoints() == 0 { log!(logger, "Edge case of no neighbors! No parent an no children!"); return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) }); } - log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints()); - let mut awaiting = HashSet::with_capacity(em.num_endpoints()); + log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints()); + let mut awaiting = HashSet::with_capacity(em.num_net_endpoints()); // 1+ neighbors. Leader can only be learned by receiving messages // loop ends when I know my sink tree parent (implies leader was elected) let election_result: WaveState = { @@ -650,7 +709,7 @@ fn session_optimize( serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm .endpoint_manager - .endpoint_exts + .net_endpoint_exts .iter() .map(|ee| ee.getter_for_incoming) .collect(), @@ -740,7 +799,7 @@ fn apply_optimizations( cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in - comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) { ee.getter_for_incoming = getter; }