From 1bacc6467d196761ac274f138012fe1cd2b6952b 2020-06-19 19:16:14 From: Christopher Esterhuyse Date: 2020-06-19 19:16:14 Subject: [PATCH] rebuilding setup phase. logging is vastly improved --- diff --git a/Cargo.toml b/Cargo.toml index ed868539539bbcd2b9428c4396b049087982cb50..c28bcfae3d822844be409bf9c946258b4ff18698 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,9 @@ indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal # network integer-encoding = "1.0.7" byteorder = "1.3.2" -mio = "0.6.21" +# mio = "0.6.21" mio-extras = "2.0.6" -mio07 = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] } +mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] } # protocol # id-arena = "2.2.1" diff --git a/src/common.rs b/src/common.rs index 633372524b6c50ed005f0ddf0263f79bf8b427cd..9b96d11052a9f93b869515d8e3a79e6a83a8af15 100644 --- a/src/common.rs +++ b/src/common.rs @@ -14,7 +14,7 @@ pub use indexmap::{IndexMap, IndexSet}; pub use maplit::{hashmap, hashset}; pub use mio::{ net::{TcpListener, TcpStream}, - Event, Evented, Events, Poll, PollOpt, Ready, Token, + Events, Interest, Poll, Token, }; pub use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3999f7a07d4287caa3eaccb111afff0a6a0286e3..786bd0de9a8c09a602fa0be0bb4d1cc9abfe6b9c 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -62,7 +62,7 @@ pub(crate) enum CommonSatResult { } pub struct Endpoint { inbox: Vec, - stream: mio07::net::TcpStream, + stream: TcpStream, } #[derive(Debug, Default)] pub struct IntStream { @@ -111,10 +111,11 @@ pub struct MemInMsg { } #[derive(Debug)] pub struct EndpointPoller { - poll: mio07::Poll, - events: mio07::Events, - undrained_endpoints: HashSet, - delayed_inp_messages: Vec<(PortId, Msg)>, + poll: Poll, + events: Events, + undrained_endpoints: IndexSet, + delayed_messages: Vec<(usize, Msg)>, + undelayed_messages: Vec<(usize, Msg)>, } #[derive(Debug)] pub struct Connector { @@ -165,7 +166,54 @@ pub struct SyncContext<'a> { pub struct NonsyncContext<'a> { connector: &'a mut Connector, } +enum TryRecyAnyError { + Timeout, + PollFailed, + EndpointRecvErr { error: EndpointRecvErr, index: usize }, + BrokenEndpoint(usize), +} //////////////// +impl EndpointPoller { + fn try_recv_any( + &mut self, + endpoint_exts: &mut [EndpointExt], + deadline: Instant, + ) -> Result<(usize, Msg), TryRecyAnyError> { + use TryRecyAnyError::*; + // 1. try messages already buffered + if let Some(x) = self.undelayed_messages.pop() { + return Ok(x); + } + // 2. try read from sockets nonblocking + while let Some(index) = self.undrained_endpoints.pop() { + if let Some(msg) = endpoint_exts[index] + .endpoint + .try_recv() + .map_err(|error| EndpointRecvErr { error, index })? + { + return Ok((index, msg)); + } + } + // 3. poll for progress + loop { + let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; + self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; + for event in self.events.iter() { + let Token(index) = event.token(); + if let Some(msg) = endpoint_exts[index] + .endpoint + .try_recv() + .map_err(|error| EndpointRecvErr { error, index })? + { + return Ok((index, msg)); + } + } + } + } + fn undelay_all(&mut self) { + self.undelayed_messages.extend(self.delayed_messages.drain(..)); + } +} impl Debug for Endpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() @@ -291,9 +339,21 @@ impl Endpoint { } } impl Connector { - fn get_logger(&self) -> &dyn Logger { + pub fn get_logger(&self) -> &dyn Logger { &*self.logger } + pub fn print_state(&self) { + let stdout = std::io::stdout(); + let mut lock = stdout.lock(); + writeln!( + lock, + "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n", + self.id_manager.controller_id + ) + .unwrap(); + self.get_logger().dump_log(&mut lock); + writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); + } } // #[derive(Debug)] diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs index 56d9f89c5429c586800e84ff96dcc5824bd3cab2..998243c8139cec175f1f91415ebdc6cff08e4975 100644 --- a/src/runtime/my_tests.rs +++ b/src/runtime/my_tests.rs @@ -1,5 +1,6 @@ use crate as reowolf; -use reowolf::Polarity::*; +use crossbeam_utils::thread::scope; +use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription}; use std::net::SocketAddr; use std::{sync::Arc, time::Duration}; @@ -14,19 +15,19 @@ fn next_test_addr() -> SocketAddr { } lazy_static::lazy_static! { - static ref MINIMAL_PROTO: Arc = + static ref MINIMAL_PROTO: Arc = { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) }; } #[test] fn simple_connector() { - let c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); println!("{:#?}", c); } #[test] fn add_port_pair() { - let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, _] = c.add_port_pair(); let [_, _] = c.add_port_pair(); println!("{:#?}", c); @@ -34,7 +35,7 @@ fn add_port_pair() { #[test] fn add_sync() { - let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, i] = c.add_port_pair(); c.add_component(b"sync", &[i, o]).unwrap(); println!("{:#?}", c); @@ -42,35 +43,51 @@ fn add_sync() { #[test] fn add_net_port() { - let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let sock_addr = next_test_addr(); - let _ = c - .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false }) - .unwrap(); - let _ = c - .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true }) - .unwrap(); + let _ = + c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap(); + let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap(); println!("{:#?}", c); } #[test] fn trivial_connect() { - let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); c.connect(Duration::from_secs(1)).unwrap(); println!("{:#?}", c); } #[test] fn single_node_connect() { - let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); let sock_addr = next_test_addr(); - let _ = c - .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false }) - .unwrap(); - let _ = c - .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true }) - .unwrap(); + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let _ = + c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap(); + let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); println!("{:#?}", c); c.get_logger().dump_log(&mut std::io::stdout().lock()); } + +#[test] +fn multithreaded_connect() { + let sock_addr = next_test_addr(); + scope(|s| { + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let es = EndpointSetup { polarity: Getter, sock_addr, is_active: true }; + let _ = c.add_net_port(es).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.print_state(); + }); + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let es = EndpointSetup { polarity: Putter, sock_addr, is_active: false }; + let _ = c.add_net_port(es).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.print_state(); + }); + }) + .unwrap(); +} diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index fcd4c87a48d131ae8c230ce6de899cf5f03ad353..8f099b79463cb95d2007601224e24f1779e6b0d1 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -115,8 +115,13 @@ impl Connector { )?; log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len()); // leader election and tree construction - let neighborhood = - init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?; + let neighborhood = init_neighborhood( + self.id_manager.controller_id, + &mut *self.logger, + &mut endpoint_exts, + &mut endpoint_poller, + deadline, + )?; log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { @@ -137,10 +142,6 @@ fn init_endpoints( inp_to_route: &mut HashMap, deadline: Instant, ) -> Result<(Vec, EndpointPoller), ()> { - use mio07::{ - net::{TcpListener, TcpStream}, - Events, Interest, Poll, Token, - }; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -182,7 +183,8 @@ fn init_endpoints( poll: Poll::new().map_err(drop)?, events: Events::with_capacity(64), undrained_endpoints: Default::default(), - delayed_inp_messages: Default::default(), + delayed_messages: Default::default(), + undelayed_messages: Default::default(), }; let mut todos = endpoint_setups @@ -264,12 +266,133 @@ fn init_endpoints( } fn init_neighborhood( + controller_id: ControllerId, logger: &mut dyn Logger, endpoint_exts: &mut [EndpointExt], - endpoint_poller: &mut EndpointPoller, + ep: &mut EndpointPoller, + deadline: Instant, ) -> Result { - log!(logger, "Time to construct my neighborhood"); - let parent = None; - let children = Default::default(); + log!(logger, "beginning neighborhood construction"); + use Msg::SetupMsg as S; + use SetupMsg::*; + + // 1. broadcast my ID as the first echo. await reply from all neighbors + let echo = S(LeaderEcho { maybe_leader: controller_id }); + let mut awaiting = HashSet::with_capacity(endpoint_exts.len()); + for (index, ee) in endpoint_exts.iter_mut().enumerate() { + log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); + ee.endpoint.send(&echo)?; + awaiting.insert(index); + } + + // 2. Receive incoming replies. whenever a higher-id echo arrives, + // adopt it as leader, sender as parent, and reset the await set. + let mut parent: Option = None; + let mut my_leader = controller_id; + ep.undelay_all(); + 'echo_loop: while !awaiting.is_empty() || parent.is_some() { + let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); + match msg { + S(LeaderAnnounce { leader }) => { + // someone else completed the echo and became leader first! + // the sender is my parent + parent = Some(index); + my_leader = leader; + awaiting.clear(); + break 'echo_loop; + } + S(LeaderEcho { maybe_leader }) => { + use Ordering::*; + match maybe_leader.cmp(&my_leader) { + Less => { /* ignore */ } + Equal => { + awaiting.remove(&index); + if awaiting.is_empty() { + if let Some(p) = parent { + // return the echo to my parent + endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?; + } else { + // DECIDE! + break 'echo_loop; + } + } + } + Greater => { + // join new echo + log!(logger, "Setting leader to index {:?}", index); + parent = Some(index); + my_leader = maybe_leader; + let echo = S(LeaderEcho { maybe_leader: my_leader }); + awaiting.clear(); + if endpoint_exts.len() == 1 { + // immediately reply to parent + log!(logger, "replying echo to parent {:?} immediately", index); + endpoint_exts[index].endpoint.send(&echo)?; + } else { + for (index2, ee) in endpoint_exts.iter_mut().enumerate() { + if index2 == index { + continue; + } + log!(logger, "repeating echo {:?} to {:?}", &echo, index2); + ee.endpoint.send(&echo)?; + awaiting.insert(index2); + } + } + } + } + } + inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + } + } + match parent { + None => assert_eq!( + my_leader, controller_id, + "I've got no parent, but I consider {:?} the leader?", + my_leader + ), + Some(parent) => assert_ne!( + my_leader, controller_id, + "I have {:?} as parent, but I consider myself ({:?}) the leader?", + parent, controller_id + ), + } + + log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); + + // 3. broadcast leader announcement (except to parent: confirm they are your parent) + // in this loop, every node sends 1 message to each neighbor + // await 1 message from all non-parents + let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); + for (index, ee) in endpoint_exts.iter_mut().enumerate() { + let msg = if Some(index) == parent { + &S(YouAreMyParent) + } else { + awaiting.insert(index); + &msg_for_non_parents + }; + log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); + ee.endpoint.send(msg)?; + } + let mut children = Vec::default(); + ep.undelay_all(); + while !awaiting.is_empty() { + let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?; + match msg { + S(YouAreMyParent) => { + assert!(awaiting.remove(&index)); + children.push(index); + } + S(SetupMsg::LeaderAnnounce { leader }) => { + assert!(awaiting.remove(&index)); + assert!(leader == my_leader); + assert!(Some(index) != parent); + // they wouldn't send me this if they considered me their parent + } + inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)), + } + } + children.sort(); + children.dedup(); Ok(Neighborhood { parent, children }) }