diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs new file mode 100644 index 0000000000000000000000000000000000000000..f6233c1b4a40ea2bfdd0b6239a8fb31401f46eb1 --- /dev/null +++ b/src/runtime/setup.rs @@ -0,0 +1,384 @@ +use crate::common::*; +use crate::runtime::*; + +impl Connector { + pub fn new_simple( + proto_description: Arc, + controller_id: ControllerId, + ) -> Self { + let logger = Box::new(StringLogger::new(controller_id)); + // let logger = Box::new(DummyLogger); + let surplus_sockets = 8; + Self::new(logger, proto_description, controller_id, surplus_sockets) + } + pub fn new( + logger: Box, + proto_description: Arc, + controller_id: ControllerId, + surplus_sockets: u16, + ) -> Self { + Self { + proto_description, + proto_components: Default::default(), + logger, + id_manager: IdManager::new(controller_id), + native_ports: Default::default(), + port_info: Default::default(), + phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, + } + } + pub fn new_net_port( + &mut self, + polarity: Polarity, + endpoint_setup: EndpointSetup, + ) -> Result { + match &mut self.phased { + ConnectorPhased::Setup { endpoint_setups, .. } => { + let p = self.id_manager.new_port_id(); + self.native_ports.insert(p); + // {polarity, route} known. {peer} unknown. + self.port_info.polarities.insert(p, polarity); + self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); + log!( + self.logger, + "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", + p, + polarity, + &endpoint_setup + ); + endpoint_setups.push((p, endpoint_setup)); + Ok(p) + } + ConnectorPhased::Communication { .. } => Err(()), + } + } + pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { + match &mut self.phased { + ConnectorPhased::Communication { .. } => { + log!(self.logger, "Call to connecting in connected state"); + Err(()) + } + ConnectorPhased::Setup { endpoint_setups, .. } => { + log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); + let deadline = Instant::now() + timeout; + // connect all endpoints in parallel; send and receive peer ids through ports + let mut endpoint_manager = new_endpoint_manager( + &mut *self.logger, + endpoint_setups, + &mut self.port_info, + deadline, + )?; + log!( + self.logger, + "Successfully connected {} endpoints", + endpoint_manager.endpoint_exts.len() + ); + // leader election and tree construction + let neighborhood = init_neighborhood( + self.id_manager.controller_id, + &mut *self.logger, + &mut endpoint_manager, + deadline, + )?; + log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); + // TODO session optimization goes here + self.phased = ConnectorPhased::Communication { + round_index: 0, + endpoint_manager, + neighborhood, + mem_inbox: Default::default(), + native_batches: vec![Default::default()], + round_result: Ok(None), + }; + Ok(()) + } + } + } +} + +fn new_endpoint_manager( + logger: &mut dyn Logger, + endpoint_setups: &[(PortId, EndpointSetup)], + port_info: &mut PortInfo, + deadline: Instant, +) -> Result { + //////////////////////////////////////////// + const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + struct Todo { + todo_endpoint: TodoEndpoint, + local_port: PortId, + sent_local_port: bool, // true <-> I've sent my local port + recv_peer_port: Option, // Some(..) <-> I've received my peer's port + } + enum TodoEndpoint { + Listener(TcpListener), + Endpoint(Endpoint), + } + fn init_todo( + token: Token, + local_port: PortId, + endpoint_setup: &EndpointSetup, + poll: &mut Poll, + ) -> Result { + let todo_endpoint = if endpoint_setup.is_active { + let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut stream, token, BOTH).unwrap(); + TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) + } else { + let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut listener, token, BOTH).unwrap(); + TodoEndpoint::Listener(listener) + }; + Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None }) + }; + //////////////////////////////////////////// + + // 1. Start to construct EndpointManager + let mut poll = Poll::new().map_err(drop)?; + let mut events = Events::with_capacity(64); + let mut polled_undrained = IndexSet::::default(); + let mut delayed_messages = vec![]; + + // 2. create a registered (TcpListener/Endpoint) for passive / active respectively + let mut todos = endpoint_setups + .iter() + .enumerate() + .map(|(index, (local_port, endpoint_setup))| { + init_todo(Token(index), *local_port, endpoint_setup, &mut poll) + }) + .collect::, _>>()?; + + // 3. Using poll to drive progress: + // - accept an incoming connection for each TcpListener (turning them into endpoints too) + // - for each endpoint, send the local PortId + // - for each endpoint, recv the peer's PortId, and + let mut setup_incomplete: HashSet = (0..todos.len()).collect(); + while !setup_incomplete.is_empty() { + let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; + poll.poll(&mut events, Some(remaining)).map_err(drop)?; + for event in events.iter() { + let token = event.token(); + let Token(index) = token; + let todo: &mut Todo = &mut todos[index]; + if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { + let (mut stream, peer_addr) = listener.accept().map_err(drop)?; + poll.registry().deregister(listener).unwrap(); + poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr); + let endpoint = Endpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + } + match todo { + Todo { + todo_endpoint: TodoEndpoint::Endpoint(endpoint), + local_port, + sent_local_port, + recv_peer_port, + .. + } => { + if !setup_incomplete.contains(&index) { + continue; + } + let local_polarity = *port_info.polarities.get(local_port).unwrap(); + if event.is_writable() && !*sent_local_port { + let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { + polarity: local_polarity, + port: *local_port, + })); + endpoint.send(&msg)?; + log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); + *sent_local_port = true; + } + if event.is_readable() && recv_peer_port.is_none() { + let maybe_msg = endpoint.try_recv().map_err(drop)?; + if maybe_msg.is_some() && !endpoint.inbox.is_empty() { + polled_undrained.insert(index); + } + match maybe_msg { + None => {} // msg deserialization incomplete + Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { + log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); + assert!(peer_info.polarity != local_polarity); + *recv_peer_port = Some(peer_info.port); + // 1. finally learned the peer of this port! + port_info.peers.insert(*local_port, peer_info.port); + // 2. learned the info of this peer port + port_info.polarities.insert(peer_info.port, peer_info.polarity); + port_info.peers.insert(peer_info.port, *local_port); + port_info.routes.insert(peer_info.port, Route::Endpoint { index }); + } + Some(inappropriate_msg) => { + log!( + logger, + "delaying msg {:?} during channel setup phase", + inappropriate_msg + ); + delayed_messages.push((index, inappropriate_msg)); + } + } + } + if *sent_local_port && recv_peer_port.is_some() { + setup_incomplete.remove(&index); + log!(logger, "endpoint[{}] is finished!", index); + } + } + Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(), + } + } + events.clear(); + } + let endpoint_exts = todos + .into_iter() + .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt { + endpoint: match todo_endpoint { + TodoEndpoint::Endpoint(endpoint) => endpoint, + TodoEndpoint::Listener(..) => unreachable!(), + }, + getter_for_incoming: local_port, + }) + .collect(); + Ok(EndpointManager { + poll, + events, + polled_undrained, + undelayed_messages: delayed_messages, // no longer delayed + delayed_messages: Default::default(), + endpoint_exts, + }) +} + +fn init_neighborhood( + controller_id: ControllerId, + logger: &mut dyn Logger, + em: &mut EndpointManager, + deadline: Instant, +) -> Result { + use {Msg::SetupMsg as S, SetupMsg::*}; + log!(logger, "beginning neighborhood construction"); + // 1. broadcast my ID as the first echo. await reply from all neighbors + let echo = S(LeaderEcho { maybe_leader: controller_id }); + let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); + for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); + ee.endpoint.send(&echo)?; + awaiting.insert(index); + } + + // 2. Receive incoming replies. whenever a higher-id echo arrives, + // adopt it as leader, sender as parent, and reset the await set. + let mut parent: Option = None; + let mut my_leader = controller_id; + em.undelay_all(); + 'echo_loop: while !awaiting.is_empty() || parent.is_some() { + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); + match msg { + S(LeaderAnnounce { leader }) => { + // someone else completed the echo and became leader first! + // the sender is my parent + parent = Some(index); + my_leader = leader; + awaiting.clear(); + break 'echo_loop; + } + S(LeaderEcho { maybe_leader }) => { + use Ordering::*; + match maybe_leader.cmp(&my_leader) { + Less => { /* ignore this wave */ } + Equal => { + awaiting.remove(&index); + if awaiting.is_empty() { + if let Some(p) = parent { + // return the echo to my parent + em.send_to(p, &S(LeaderEcho { maybe_leader }))?; + } else { + // wave completed! + break 'echo_loop; + } + } + } + Greater => { + // join new echo + log!(logger, "Setting leader to index {:?}", index); + parent = Some(index); + my_leader = maybe_leader; + let echo = S(LeaderEcho { maybe_leader: my_leader }); + awaiting.clear(); + if em.endpoint_exts.len() == 1 { + // immediately reply to parent + log!(logger, "replying echo to parent {:?} immediately", index); + em.send_to(index, &echo)?; + } else { + for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { + if index2 == index { + // don't propagate echo to my parent + continue; + } + log!(logger, "repeating echo {:?} to {:?}", &echo, index2); + ee.endpoint.send(&echo)?; + awaiting.insert(index2); + } + } + } + } + } + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)) + } + } + } + match parent { + None => assert_eq!( + my_leader, controller_id, + "I've got no parent, but I consider {:?} the leader?", + my_leader + ), + Some(parent) => assert_ne!( + my_leader, controller_id, + "I have {:?} as parent, but I consider myself ({:?}) the leader?", + parent, controller_id + ), + } + log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); + + // 3. broadcast leader announcement (except to parent: confirm they are your parent) + // in this loop, every node sends 1 message to each neighbor + // await 1 message from all non-parents. + let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); + for (index, ee) in em.endpoint_exts.iter_mut().enumerate() { + let msg = if Some(index) == parent { + &S(YouAreMyParent) + } else { + awaiting.insert(index); + &msg_for_non_parents + }; + log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); + ee.endpoint.send(msg)?; + } + let mut children = Vec::default(); + em.undelay_all(); + while !awaiting.is_empty() { + log!(logger, "awaiting {:?}", &awaiting); + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + match msg { + S(YouAreMyParent) => { + assert!(awaiting.remove(&index)); + children.push(index); + } + S(LeaderAnnounce { leader }) => { + assert!(awaiting.remove(&index)); + assert!(leader == my_leader); + assert!(Some(index) != parent); + // they wouldn't send me this if they considered me their parent + } + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)); + } + } + } + children.sort(); + children.dedup(); + Ok(Neighborhood { parent, children }) +}