diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index f6233c1b4a40ea2bfdd0b6239a8fb31401f46eb1..ae3f1d44608bc347e7930846945eda154243cccf 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,14 +1,15 @@ use crate::common::*; use crate::runtime::*; +use std::io::ErrorKind::WouldBlock; impl Connector { pub fn new_simple( proto_description: Arc, controller_id: ControllerId, ) -> Self { - let logger = Box::new(StringLogger::new(controller_id)); + let logger = Box::new(DummyLogger); // let logger = Box::new(DummyLogger); - let surplus_sockets = 8; + let surplus_sockets = 2; Self::new(logger, proto_description, controller_id, surplus_sockets) } pub fn new( @@ -52,15 +53,16 @@ impl Connector { ConnectorPhased::Communication { .. } => Err(()), } } - pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { + pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { + use ConnectError::*; match &mut self.phased { ConnectorPhased::Communication { .. } => { log!(self.logger, "Call to connecting in connected state"); - Err(()) + Err(AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); - let deadline = Instant::now() + timeout; + let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( &mut *self.logger, @@ -81,6 +83,7 @@ impl Connector { deadline, )?; log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(self.logger, "connect() finished. setup phase complete"); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { round_index: 0, @@ -100,9 +103,10 @@ fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, EndpointSetup)], port_info: &mut PortInfo, - deadline: Instant, -) -> Result { + deadline: Option, +) -> Result { //////////////////////////////////////////// + use ConnectError::*; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -119,13 +123,15 @@ fn new_endpoint_manager( local_port: PortId, endpoint_setup: &EndpointSetup, poll: &mut Poll, - ) -> Result { + ) -> Result { let todo_endpoint = if endpoint_setup.is_active { - let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?; + let mut stream = TcpStream::connect(endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) } else { - let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?; + let mut listener = TcpListener::bind(endpoint_setup.sock_addr) + .map_err(|_| BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Listener(listener) }; @@ -134,7 +140,7 @@ fn new_endpoint_manager( //////////////////////////////////////////// // 1. Start to construct EndpointManager - let mut poll = Poll::new().map_err(drop)?; + let mut poll = Poll::new().map_err(|_| PollInitFailed)?; let mut events = Events::with_capacity(64); let mut polled_undrained = IndexSet::::default(); let mut delayed_messages = vec![]; @@ -146,7 +152,7 @@ fn new_endpoint_manager( .map(|(index, (local_port, endpoint_setup))| { init_todo(Token(index), *local_port, endpoint_setup, &mut poll) }) - .collect::, _>>()?; + .collect::, ConnectError>>()?; // 3. Using poll to drive progress: // - accept an incoming connection for each TcpListener (turning them into endpoints too) @@ -154,19 +160,33 @@ fn new_endpoint_manager( // - for each endpoint, recv the peer's PortId, and let mut setup_incomplete: HashSet = (0..todos.len()).collect(); while !setup_incomplete.is_empty() { - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; - poll.poll(&mut events, Some(remaining)).map_err(drop)?; + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + poll.poll(&mut events, remaining).map_err(|_| PollFailed)?; for event in events.iter() { let token = event.token(); let Token(index) = token; let todo: &mut Todo = &mut todos[index]; if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { - let (mut stream, peer_addr) = listener.accept().map_err(drop)?; - poll.registry().deregister(listener).unwrap(); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr); - let endpoint = Endpoint { stream, inbox: vec![] }; - todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + match listener.accept() { + Ok((mut stream, peer_addr)) => { + poll.registry().deregister(listener).unwrap(); + poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!( + logger, + "Endpoint[{}] accepted a connection from {:?}", + index, + peer_addr + ); + let endpoint = Endpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + } + Err(e) if e.kind() == WouldBlock => {} + Err(_) => return Err(AcceptFailed(listener.local_addr().unwrap())), + } } match todo { Todo { @@ -185,12 +205,19 @@ fn new_endpoint_manager( polarity: local_polarity, port: *local_port, })); - endpoint.send(&msg)?; + endpoint + .send(&msg) + .map_err(|e| { + EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + }) + .unwrap(); log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv().map_err(drop)?; + let maybe_msg = endpoint.try_recv().map_err(|e| { + EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { polled_undrained.insert(index); } @@ -198,7 +225,11 @@ fn new_endpoint_manager( None => {} // msg deserialization incomplete Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); - assert!(peer_info.polarity != local_polarity); + if peer_info.polarity == local_polarity { + return Err(ConnectError::PortPeerPolarityMismatch( + *local_port, + )); + } *recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! port_info.peers.insert(*local_port, peer_info.port); @@ -251,16 +282,16 @@ fn init_neighborhood( controller_id: ControllerId, logger: &mut dyn Logger, em: &mut EndpointManager, - deadline: Instant, -) -> Result { + deadline: Option, +) -> Result { use {Msg::SetupMsg as S, SetupMsg::*}; log!(logger, "beginning neighborhood construction"); // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); - for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index in 0..em.endpoint_exts.len() { log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); - ee.endpoint.send(&echo)?; + em.send_to_setup(index, &echo)?; awaiting.insert(index); } @@ -270,7 +301,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any_setup(deadline)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -290,7 +321,7 @@ fn init_neighborhood( if awaiting.is_empty() { if let Some(p) = parent { // return the echo to my parent - em.send_to(p, &S(LeaderEcho { maybe_leader }))?; + em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?; } else { // wave completed! break 'echo_loop; @@ -307,15 +338,15 @@ fn init_neighborhood( if em.endpoint_exts.len() == 1 { // immediately reply to parent log!(logger, "replying echo to parent {:?} immediately", index); - em.send_to(index, &echo)?; + em.send_to_setup(index, &echo)?; } else { - for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index2 in 0..em.endpoint_exts.len() { if index2 == index { // don't propagate echo to my parent continue; } log!(logger, "repeating echo {:?} to {:?}", &echo, index2); - ee.endpoint.send(&echo)?; + em.send_to_setup(index2, &echo)?; awaiting.insert(index2); } } @@ -346,7 +377,7 @@ fn init_neighborhood( // in this loop, every node sends 1 message to each neighbor // await 1 message from all non-parents. let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + for index in 0..em.endpoint_exts.len() { let msg = if Some(index) == parent { &S(YouAreMyParent) } else { @@ -354,13 +385,13 @@ fn init_neighborhood( &msg_for_non_parents }; log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); - ee.endpoint.send(msg)?; + em.send_to_setup(index, msg)?; } let mut children = Vec::default(); em.undelay_all(); while !awaiting.is_empty() { log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any_setup(deadline)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index));