diff --git a/src/runtime/retired/setup.rs b/src/runtime/retired/setup.rs deleted file mode 100644 index 7e9d70da2fd3abcbb45dfdeac03859ae25d1b645..0000000000000000000000000000000000000000 --- a/src/runtime/retired/setup.rs +++ /dev/null @@ -1,484 +0,0 @@ -use crate::common::*; -use crate::runtime::{ - actors::{MonoN, MonoP}, - endpoint::*, - errors::*, - *, -}; - -#[derive(Debug)] -enum EndpointExtTodo { - Finished(EndpointExt), - ActiveConnecting { addr: SocketAddr, polarity: Polarity, stream: TcpStream }, - ActiveRecving { addr: SocketAddr, polarity: Polarity, endpoint: Endpoint }, - PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener }, - PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream }, -} - -///////////////////// IMPL ///////////////////// -impl Controller { - // Given port bindings and a protocol config, create a connector with 1 native node - pub fn connect( - major: ControllerId, - main_component: &[u8], - protocol_description: Arc, - bound_proto_interface: &[(PortBinding, Polarity)], - logger: &mut String, - deadline: Instant, - ) -> Result<(Self, Vec<(PortId, Polarity)>), ConnectErr> { - use ConnectErr::*; - - log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); - - let mut channel_id_stream = ChannelIdStream::new(major); - let mut endpoint_ext_todos = Arena::default(); - - let mut ports_native = vec![]; - let mut ports_proto = vec![]; - let mut ports_network = vec![]; - - let mut native_interface = vec![]; - - /* - 1. - allocate an EndpointExtTodo for every native and interface port - - store all the resulting ports in two portlists for the interfaces of the native and proto components - native: [a, c, f] - | | | - | | | - proto: [b, d, e, g] - ^todo - arena: - */ - for &(binding, polarity) in bound_proto_interface.iter() { - match binding { - PortBinding::Native => { - let channel_id = channel_id_stream.next(); - let ([port_native, port_proto], native_polarity) = { - let [p, g] = Endpoint::new_memory_pair(); - let mut endpoint_to_port = |endpoint, polarity| { - endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt { - endpoint, - info: EndpointInfo { polarity, channel_id }, - })) - }; - let pport = endpoint_to_port(p, Putter); - let gport = endpoint_to_port(g, Getter); - let port_pair = match polarity { - Putter => [gport, pport], - Getter => [pport, gport], - }; - (port_pair, !polarity) - }; - native_interface.push((port_native, native_polarity)); - ports_native.push(port_native); - ports_proto.push(port_proto); - } - PortBinding::Passive(addr) => { - let channel_id = channel_id_stream.next(); - let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { - addr, - info: EndpointInfo { polarity, channel_id }, - listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?, - }); - ports_network.push(port_proto); - ports_proto.push(port_proto); - } - PortBinding::Active(addr) => { - let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { - addr, - polarity, - stream: TcpStream::connect(&addr).unwrap(), - }); - ports_network.push(port_proto); - ports_proto.push(port_proto); - } - } - } - log!(logger, "{:03?} setup todos...", major); - - // 2. convert the arena to Arena and return the - let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; - - let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None }; - let p_monos = vec![MonoP { - state: protocol_description.new_main_component(main_component, &ports_proto), - ports: ports_proto.into_iter().collect(), - }]; - - // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} - let family = Self::setup_sink_tree_family( - major, - logger, - &mut endpoint_exts, - &mut messenger_state, - ports_network, - deadline, - )?; - - log!(logger, "CONNECT PHASE END! ~"); - let inner = ControllerInner { - family, - messenger_state, - channel_id_stream, - endpoint_exts, - mono_ps: p_monos, - mono_n: n_mono, - round_index: 0, - logger: { - let mut l = String::default(); - std::mem::swap(&mut l, logger); - l - }, - }; - let controller = Self { - protocol_description, - inner, - ephemeral: Default::default(), - // round_histories: vec![], - unrecoverable_error: None, - }; - Ok((controller, native_interface)) - } - - // with mio v0.6 attempting to read bytes into a nonempty buffer appears to - // be the only reliably platform-independent means of testing the connectivity of - // a mio::TcpStream (see Self::connection_testing_read). - // as this unavoidably MAY read some crucial payload bytes, we have to be careful - // to pass these potentially populated buffers into the Endpoint, or bytes may be lost. - // This is done with Endpoint::from_fresh_stream_and_inbox. - fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec) -> std::io::Result<()> { - inbox.clear(); - use std::io::Read; - match stream.read_to_end(inbox) { - Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), - Ok(_) => Ok(()), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), - Err(e) => Err(e), - } - } - - // inserts - fn finish_endpoint_ext_todos( - major: ControllerId, - logger: &mut String, - mut endpoint_ext_todos: Arena, - deadline: Instant, - ) -> Result<(MessengerState, Arena), ConnectErr> { - use {ConnectErr::*, EndpointExtTodo::*}; - - // 1. define and setup a poller and event loop - let edge = PollOpt::edge(); - let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; - let mut ms = MessengerState { - poll: Poll::new().map_err(|_| PollInitFailed)?, - events: Events::with_capacity(endpoint_ext_todos.len()), - delayed: vec![], - undelayed: vec![], - polled_undrained: Default::default(), - }; - - // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} - // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`. - let mut to_finish = HashSet::<_>::default(); - log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); - for (port, t) in endpoint_ext_todos.iter() { - let token = port.to_token(); - match t { - ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(), - Finished(EndpointExt { endpoint, .. }) => { - ms.poll.register(endpoint, token, ready_r, edge) - } - ActiveConnecting { stream, .. } => { - to_finish.insert(port); - ms.poll.register(stream, token, ready_w, edge) - } - PassiveAccepting { listener, .. } => { - to_finish.insert(port); - ms.poll.register(listener, token, ready_r, edge) - } - } - .expect("register first"); - } - // invariant: every EndpointExtTodo has one thing registered with mio - - // 4. until all in endpoint_ext_todos are Finished variant, handle events - let mut polled_undrained_later = IndexSet::<_>::default(); - let mut backoff_millis = 10; - // see Self::connection_testing_read for why we populate Endpoint inboxes here. - let mut next_inbox = vec![]; - while !to_finish.is_empty() { - ms.poll_events(deadline).map_err(|e| { - log!(logger, "{:03?} timing out", major); - e - })?; - for event in ms.events.iter() { - log!(logger, "event {:#?}", event); - let token = event.token(); - let port = PortId::from_token(token); - let entry = endpoint_ext_todos.get_mut(port).unwrap(); - match entry { - Finished(_) => { - polled_undrained_later.insert(port); - } - PassiveAccepting { addr, listener, .. } => { - log!(logger, "{:03?} start PassiveAccepting...", major); - assert!(event.readiness().is_readable()); - let (stream, _peer_addr) = - listener.accept().map_err(|_| AcceptFailed(*addr))?; - ms.poll.deregister(listener).expect("wer"); - ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); - take_mut::take(entry, |e| { - assert_let![PassiveAccepting { addr, info, .. } = e => { - PassiveConnecting { addr, info, stream } - }] - }); - log!(logger, "{:03?} ... end PassiveAccepting", major); - } - PassiveConnecting { addr, stream, .. } => { - log!(logger, "{:03?} start PassiveConnecting...", major); - assert!(event.readiness().is_writable()); - if Self::connection_testing_read(stream, &mut next_inbox).is_err() { - return Err(PassiveConnectFailed(*addr)); - } - ms.poll.reregister(stream, token, ready_r, edge).expect("52"); - let mut res = Ok(()); - take_mut::take(entry, |e| { - let mut inbox = vec![]; - std::mem::swap(&mut inbox, &mut next_inbox); - assert_let![PassiveConnecting { info, stream, .. } = e => { - let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); - let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); - res = endpoint.send(msg); - Finished(EndpointExt { info, endpoint }) - }] - }); - res?; - log!(logger, "{:03?} ... end PassiveConnecting", major); - assert!(to_finish.remove(&port)); - } - ActiveConnecting { addr, stream, .. } => { - log!(logger, "{:03?} start ActiveConnecting...", major); - assert!(event.readiness().is_writable()); - if Self::connection_testing_read(stream, &mut next_inbox).is_ok() { - // connect successful - log!(logger, "Connectivity test passed"); - ms.poll.reregister(stream, token, ready_r, edge).expect("52"); - take_mut::take(entry, |e| { - let mut inbox = vec![]; - std::mem::swap(&mut inbox, &mut next_inbox); - assert_let![ActiveConnecting { stream, polarity, addr } = e => { - let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); - ActiveRecving { endpoint, polarity, addr } - }] - }); - } else { - // connect failure. retry! - log!(logger, "CONNECT FAIL"); - ms.poll.deregister(stream).expect("wt"); - std::thread::sleep(Duration::from_millis(backoff_millis)); - backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; - let mut new_stream = TcpStream::connect(addr).unwrap(); - ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3"); - std::mem::swap(stream, &mut new_stream); - } - log!(logger, "{:03?} ... end ActiveConnecting", major); - } - ActiveRecving { addr, polarity, endpoint } => { - log!(logger, "{:03?} start ActiveRecving...", major); - assert!(event.readiness().is_readable()); - 'recv_loop: while let Some(msg) = endpoint.recv()? { - if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { - if info.polarity == *polarity { - return Err(PolarityMatched(*addr)); - } - take_mut::take(entry, |e| { - assert_let![ActiveRecving { polarity, endpoint, .. } = e => { - let info = EndpointInfo { polarity, channel_id: info.channel_id }; - Finished(EndpointExt { info, endpoint }) - }] - }); - ms.polled_undrained.insert(port); - assert!(to_finish.remove(&port)); - break 'recv_loop; - } else { - ms.delayed.push(ReceivedMsg { recipient: port, msg }); - } - } - log!(logger, "{:03?} ... end ActiveRecving", major); - } - } - } - } - for port in polled_undrained_later { - ms.polled_undrained.insert(port); - } - let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo { - Finished(endpoint_ext) => endpoint_ext, - _ => unreachable!(), - }); - Ok((ms, endpoint_exts)) - } - - fn setup_sink_tree_family( - major: ControllerId, - logger: &mut String, - endpoint_exts: &mut Arena, - messenger_state: &mut MessengerState, - neighbors: Vec, - deadline: Instant, - ) -> Result { - use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; - - log!(logger, "neighbors {:?}", &neighbors); - - let mut messenger = (messenger_state, endpoint_exts); - impl Messengerlike for (&mut MessengerState, &mut Arena) { - fn get_state_mut(&mut self) -> &mut MessengerState { - self.0 - } - fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { - &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint - } - } - - // 1. broadcast my ID as the first echo. await reply from all in net_portlist - let echo = S(LeaderEcho { maybe_leader: major }); - let mut awaiting = IndexSet::with_capacity(neighbors.len()); - for &n in neighbors.iter() { - log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo); - messenger.send(n, echo.clone())?; - awaiting.insert(n); - } - - // 2. Receive incoming replies. whenever a higher-id echo arrives, - // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; - let mut my_leader = major; - messenger.undelay_all(); - 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg); - match msg { - S(LeaderAnnounce { leader }) => { - // someone else completed the echo and became leader first! - // the sender is my parent - parent = Some(recipient); - my_leader = leader; - awaiting.clear(); - break 'echo_loop; - } - S(LeaderEcho { maybe_leader }) => { - use Ordering::*; - match maybe_leader.cmp(&my_leader) { - Less => { /* ignore */ } - Equal => { - awaiting.remove(&recipient); - if awaiting.is_empty() { - if let Some(p) = parent { - // return the echo to my parent - messenger.send(p, S(LeaderEcho { maybe_leader }))?; - } else { - // DECIDE! - break 'echo_loop; - } - } - } - Greater => { - // join new echo - log!(logger, "{:?} setting leader to {:?}", major, recipient); - parent = Some(recipient); - my_leader = maybe_leader; - let echo = S(LeaderEcho { maybe_leader: my_leader }); - awaiting.clear(); - if neighbors.len() == 1 { - // immediately reply to parent - log!( - logger, - "{:?} replying echo to parent {:?} immediately", - major, - recipient - ); - messenger.send(recipient, echo.clone())?; - } else { - for &n in neighbors.iter() { - if n != recipient { - log!( - logger, - "{:?} repeating echo {:?} to {:?}", - major, - &echo, - n - ); - messenger.send(n, echo.clone())?; - awaiting.insert(n); - } - } - } - } - } - } - msg => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - match parent { - None => assert_eq!( - my_leader, major, - "I've got no parent, but I consider {:?} the leader?", - my_leader - ), - Some(parent) => assert_ne!( - my_leader, major, - "I have {:?} as parent, but I consider myself ({:?}) the leader?", - parent, major - ), - } - - log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader); - - // 3. broadcast leader announcement (except to parent: confirm they are your parent) - // in this loop, every node sends 1 message to each neighbor - let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for &k in neighbors.iter() { - let msg = - if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); - messenger.send(k, msg)?; - } - - // await 1 message from all non-parents - for &n in neighbors.iter() { - if Some(n) != parent { - awaiting.insert(n); - } - } - let mut children = Vec::default(); - messenger.undelay_all(); - while !awaiting.is_empty() { - let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - match msg { - S(YouAreMyParent) => { - assert!(awaiting.remove(&recipient)); - children.push(recipient); - } - S(SetupMsg::LeaderAnnounce { leader }) => { - assert!(awaiting.remove(&recipient)); - assert!(leader == my_leader); - assert!(Some(recipient) != parent); - // they wouldn't send me this if they considered me their parent - } - _ => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - Ok(ControllerFamily { parent_port: parent, children_ports: children }) - } -} - -impl Messengerlike for Controller { - fn get_state_mut(&mut self) -> &mut MessengerState { - &mut self.inner.messenger_state - } - fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { - &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint - } -}