diff --git a/src/runtime/retired/setup.rs b/src/runtime/retired/setup.rs new file mode 100644 index 0000000000000000000000000000000000000000..7e9d70da2fd3abcbb45dfdeac03859ae25d1b645 --- /dev/null +++ b/src/runtime/retired/setup.rs @@ -0,0 +1,484 @@ +use crate::common::*; +use crate::runtime::{ + actors::{MonoN, MonoP}, + endpoint::*, + errors::*, + *, +}; + +#[derive(Debug)] +enum EndpointExtTodo { + Finished(EndpointExt), + ActiveConnecting { addr: SocketAddr, polarity: Polarity, stream: TcpStream }, + ActiveRecving { addr: SocketAddr, polarity: Polarity, endpoint: Endpoint }, + PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener }, + PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream }, +} + +///////////////////// IMPL ///////////////////// +impl Controller { + // Given port bindings and a protocol config, create a connector with 1 native node + pub fn connect( + major: ControllerId, + main_component: &[u8], + protocol_description: Arc, + bound_proto_interface: &[(PortBinding, Polarity)], + logger: &mut String, + deadline: Instant, + ) -> Result<(Self, Vec<(PortId, Polarity)>), ConnectErr> { + use ConnectErr::*; + + log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); + + let mut channel_id_stream = ChannelIdStream::new(major); + let mut endpoint_ext_todos = Arena::default(); + + let mut ports_native = vec![]; + let mut ports_proto = vec![]; + let mut ports_network = vec![]; + + let mut native_interface = vec![]; + + /* + 1. - allocate an EndpointExtTodo for every native and interface port + - store all the resulting ports in two portlists for the interfaces of the native and proto components + native: [a, c, f] + | | | + | | | + proto: [b, d, e, g] + ^todo + arena: + */ + for &(binding, polarity) in bound_proto_interface.iter() { + match binding { + PortBinding::Native => { + let channel_id = channel_id_stream.next(); + let ([port_native, port_proto], native_polarity) = { + let [p, g] = Endpoint::new_memory_pair(); + let mut endpoint_to_port = |endpoint, polarity| { + endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt { + endpoint, + info: EndpointInfo { polarity, channel_id }, + })) + }; + let pport = endpoint_to_port(p, Putter); + let gport = endpoint_to_port(g, Getter); + let port_pair = match polarity { + Putter => [gport, pport], + Getter => [pport, gport], + }; + (port_pair, !polarity) + }; + native_interface.push((port_native, native_polarity)); + ports_native.push(port_native); + ports_proto.push(port_proto); + } + PortBinding::Passive(addr) => { + let channel_id = channel_id_stream.next(); + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { + addr, + info: EndpointInfo { polarity, channel_id }, + listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?, + }); + ports_network.push(port_proto); + ports_proto.push(port_proto); + } + PortBinding::Active(addr) => { + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { + addr, + polarity, + stream: TcpStream::connect(&addr).unwrap(), + }); + ports_network.push(port_proto); + ports_proto.push(port_proto); + } + } + } + log!(logger, "{:03?} setup todos...", major); + + // 2. convert the arena to Arena and return the + let (mut messenger_state, mut endpoint_exts) = + Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; + + let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None }; + let p_monos = vec![MonoP { + state: protocol_description.new_main_component(main_component, &ports_proto), + ports: ports_proto.into_iter().collect(), + }]; + + // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} + let family = Self::setup_sink_tree_family( + major, + logger, + &mut endpoint_exts, + &mut messenger_state, + ports_network, + deadline, + )?; + + log!(logger, "CONNECT PHASE END! ~"); + let inner = ControllerInner { + family, + messenger_state, + channel_id_stream, + endpoint_exts, + mono_ps: p_monos, + mono_n: n_mono, + round_index: 0, + logger: { + let mut l = String::default(); + std::mem::swap(&mut l, logger); + l + }, + }; + let controller = Self { + protocol_description, + inner, + ephemeral: Default::default(), + // round_histories: vec![], + unrecoverable_error: None, + }; + Ok((controller, native_interface)) + } + + // with mio v0.6 attempting to read bytes into a nonempty buffer appears to + // be the only reliably platform-independent means of testing the connectivity of + // a mio::TcpStream (see Self::connection_testing_read). + // as this unavoidably MAY read some crucial payload bytes, we have to be careful + // to pass these potentially populated buffers into the Endpoint, or bytes may be lost. + // This is done with Endpoint::from_fresh_stream_and_inbox. + fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec) -> std::io::Result<()> { + inbox.clear(); + use std::io::Read; + match stream.read_to_end(inbox) { + Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), + Ok(_) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), + Err(e) => Err(e), + } + } + + // inserts + fn finish_endpoint_ext_todos( + major: ControllerId, + logger: &mut String, + mut endpoint_ext_todos: Arena, + deadline: Instant, + ) -> Result<(MessengerState, Arena), ConnectErr> { + use {ConnectErr::*, EndpointExtTodo::*}; + + // 1. define and setup a poller and event loop + let edge = PollOpt::edge(); + let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; + let mut ms = MessengerState { + poll: Poll::new().map_err(|_| PollInitFailed)?, + events: Events::with_capacity(endpoint_ext_todos.len()), + delayed: vec![], + undelayed: vec![], + polled_undrained: Default::default(), + }; + + // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} + // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`. + let mut to_finish = HashSet::<_>::default(); + log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); + for (port, t) in endpoint_ext_todos.iter() { + let token = port.to_token(); + match t { + ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(), + Finished(EndpointExt { endpoint, .. }) => { + ms.poll.register(endpoint, token, ready_r, edge) + } + ActiveConnecting { stream, .. } => { + to_finish.insert(port); + ms.poll.register(stream, token, ready_w, edge) + } + PassiveAccepting { listener, .. } => { + to_finish.insert(port); + ms.poll.register(listener, token, ready_r, edge) + } + } + .expect("register first"); + } + // invariant: every EndpointExtTodo has one thing registered with mio + + // 4. until all in endpoint_ext_todos are Finished variant, handle events + let mut polled_undrained_later = IndexSet::<_>::default(); + let mut backoff_millis = 10; + // see Self::connection_testing_read for why we populate Endpoint inboxes here. + let mut next_inbox = vec![]; + while !to_finish.is_empty() { + ms.poll_events(deadline).map_err(|e| { + log!(logger, "{:03?} timing out", major); + e + })?; + for event in ms.events.iter() { + log!(logger, "event {:#?}", event); + let token = event.token(); + let port = PortId::from_token(token); + let entry = endpoint_ext_todos.get_mut(port).unwrap(); + match entry { + Finished(_) => { + polled_undrained_later.insert(port); + } + PassiveAccepting { addr, listener, .. } => { + log!(logger, "{:03?} start PassiveAccepting...", major); + assert!(event.readiness().is_readable()); + let (stream, _peer_addr) = + listener.accept().map_err(|_| AcceptFailed(*addr))?; + ms.poll.deregister(listener).expect("wer"); + ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); + take_mut::take(entry, |e| { + assert_let![PassiveAccepting { addr, info, .. } = e => { + PassiveConnecting { addr, info, stream } + }] + }); + log!(logger, "{:03?} ... end PassiveAccepting", major); + } + PassiveConnecting { addr, stream, .. } => { + log!(logger, "{:03?} start PassiveConnecting...", major); + assert!(event.readiness().is_writable()); + if Self::connection_testing_read(stream, &mut next_inbox).is_err() { + return Err(PassiveConnectFailed(*addr)); + } + ms.poll.reregister(stream, token, ready_r, edge).expect("52"); + let mut res = Ok(()); + take_mut::take(entry, |e| { + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + assert_let![PassiveConnecting { info, stream, .. } = e => { + let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); + let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); + res = endpoint.send(msg); + Finished(EndpointExt { info, endpoint }) + }] + }); + res?; + log!(logger, "{:03?} ... end PassiveConnecting", major); + assert!(to_finish.remove(&port)); + } + ActiveConnecting { addr, stream, .. } => { + log!(logger, "{:03?} start ActiveConnecting...", major); + assert!(event.readiness().is_writable()); + if Self::connection_testing_read(stream, &mut next_inbox).is_ok() { + // connect successful + log!(logger, "Connectivity test passed"); + ms.poll.reregister(stream, token, ready_r, edge).expect("52"); + take_mut::take(entry, |e| { + let mut inbox = vec![]; + std::mem::swap(&mut inbox, &mut next_inbox); + assert_let![ActiveConnecting { stream, polarity, addr } = e => { + let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); + ActiveRecving { endpoint, polarity, addr } + }] + }); + } else { + // connect failure. retry! + log!(logger, "CONNECT FAIL"); + ms.poll.deregister(stream).expect("wt"); + std::thread::sleep(Duration::from_millis(backoff_millis)); + backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; + let mut new_stream = TcpStream::connect(addr).unwrap(); + ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3"); + std::mem::swap(stream, &mut new_stream); + } + log!(logger, "{:03?} ... end ActiveConnecting", major); + } + ActiveRecving { addr, polarity, endpoint } => { + log!(logger, "{:03?} start ActiveRecving...", major); + assert!(event.readiness().is_readable()); + 'recv_loop: while let Some(msg) = endpoint.recv()? { + if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { + if info.polarity == *polarity { + return Err(PolarityMatched(*addr)); + } + take_mut::take(entry, |e| { + assert_let![ActiveRecving { polarity, endpoint, .. } = e => { + let info = EndpointInfo { polarity, channel_id: info.channel_id }; + Finished(EndpointExt { info, endpoint }) + }] + }); + ms.polled_undrained.insert(port); + assert!(to_finish.remove(&port)); + break 'recv_loop; + } else { + ms.delayed.push(ReceivedMsg { recipient: port, msg }); + } + } + log!(logger, "{:03?} ... end ActiveRecving", major); + } + } + } + } + for port in polled_undrained_later { + ms.polled_undrained.insert(port); + } + let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo { + Finished(endpoint_ext) => endpoint_ext, + _ => unreachable!(), + }); + Ok((ms, endpoint_exts)) + } + + fn setup_sink_tree_family( + major: ControllerId, + logger: &mut String, + endpoint_exts: &mut Arena, + messenger_state: &mut MessengerState, + neighbors: Vec, + deadline: Instant, + ) -> Result { + use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; + + log!(logger, "neighbors {:?}", &neighbors); + + let mut messenger = (messenger_state, endpoint_exts); + impl Messengerlike for (&mut MessengerState, &mut Arena) { + fn get_state_mut(&mut self) -> &mut MessengerState { + self.0 + } + fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { + &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint + } + } + + // 1. broadcast my ID as the first echo. await reply from all in net_portlist + let echo = S(LeaderEcho { maybe_leader: major }); + let mut awaiting = IndexSet::with_capacity(neighbors.len()); + for &n in neighbors.iter() { + log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo); + messenger.send(n, echo.clone())?; + awaiting.insert(n); + } + + // 2. Receive incoming replies. whenever a higher-id echo arrives, + // adopt it as leader, sender as parent, and reset the await set. + let mut parent: Option = None; + let mut my_leader = major; + messenger.undelay_all(); + 'echo_loop: while !awaiting.is_empty() || parent.is_some() { + let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; + log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg); + match msg { + S(LeaderAnnounce { leader }) => { + // someone else completed the echo and became leader first! + // the sender is my parent + parent = Some(recipient); + my_leader = leader; + awaiting.clear(); + break 'echo_loop; + } + S(LeaderEcho { maybe_leader }) => { + use Ordering::*; + match maybe_leader.cmp(&my_leader) { + Less => { /* ignore */ } + Equal => { + awaiting.remove(&recipient); + if awaiting.is_empty() { + if let Some(p) = parent { + // return the echo to my parent + messenger.send(p, S(LeaderEcho { maybe_leader }))?; + } else { + // DECIDE! + break 'echo_loop; + } + } + } + Greater => { + // join new echo + log!(logger, "{:?} setting leader to {:?}", major, recipient); + parent = Some(recipient); + my_leader = maybe_leader; + let echo = S(LeaderEcho { maybe_leader: my_leader }); + awaiting.clear(); + if neighbors.len() == 1 { + // immediately reply to parent + log!( + logger, + "{:?} replying echo to parent {:?} immediately", + major, + recipient + ); + messenger.send(recipient, echo.clone())?; + } else { + for &n in neighbors.iter() { + if n != recipient { + log!( + logger, + "{:?} repeating echo {:?} to {:?}", + major, + &echo, + n + ); + messenger.send(n, echo.clone())?; + awaiting.insert(n); + } + } + } + } + } + } + msg => messenger.delay(ReceivedMsg { recipient, msg }), + } + } + match parent { + None => assert_eq!( + my_leader, major, + "I've got no parent, but I consider {:?} the leader?", + my_leader + ), + Some(parent) => assert_ne!( + my_leader, major, + "I have {:?} as parent, but I consider myself ({:?}) the leader?", + parent, major + ), + } + + log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader); + + // 3. broadcast leader announcement (except to parent: confirm they are your parent) + // in this loop, every node sends 1 message to each neighbor + let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); + for &k in neighbors.iter() { + let msg = + if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; + log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); + messenger.send(k, msg)?; + } + + // await 1 message from all non-parents + for &n in neighbors.iter() { + if Some(n) != parent { + awaiting.insert(n); + } + } + let mut children = Vec::default(); + messenger.undelay_all(); + while !awaiting.is_empty() { + let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; + match msg { + S(YouAreMyParent) => { + assert!(awaiting.remove(&recipient)); + children.push(recipient); + } + S(SetupMsg::LeaderAnnounce { leader }) => { + assert!(awaiting.remove(&recipient)); + assert!(leader == my_leader); + assert!(Some(recipient) != parent); + // they wouldn't send me this if they considered me their parent + } + _ => messenger.delay(ReceivedMsg { recipient, msg }), + } + } + Ok(ControllerFamily { parent_port: parent, children_ports: children }) + } +} + +impl Messengerlike for Controller { + fn get_state_mut(&mut self) -> &mut MessengerState { + &mut self.inner.messenger_state + } + fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { + &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint + } +}