From 65390fb1cdbc2cd36e4da3caa8ef8f9e3bfcff44 2020-06-24 12:12:27 From: Christopher Esterhuyse Date: 2020-06-24 12:12:27 Subject: [PATCH] bugfixing and rewrote dist algs to be more readable --- diff --git a/src/common.rs b/src/common.rs index b6186c5b499b84d41fc7a2ece31ec2618c9ba200..45ca68a770a7b7f52b58a95913cd7dc2ca76023c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -28,14 +28,14 @@ pub use Polarity::*; ///////////////////// DEFS ///////////////////// -pub type ControllerId = u32; +pub type ConnectorId = u32; pub type PortSuffix = u32; #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] pub struct Id { - pub(crate) controller_id: ControllerId, + pub(crate) connector_id: ConnectorId, pub(crate) u32_suffix: PortSuffix, } @@ -179,17 +179,17 @@ impl From> for Payload { } impl Debug for PortId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "PID<{},{}>", self.0.controller_id, self.0.u32_suffix) + write!(f, "PID<{},{}>", self.0.connector_id, self.0.u32_suffix) } } impl Debug for FiringVar { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "VID<{},{}>", (self.0).0.controller_id, (self.0).0.u32_suffix) + write!(f, "VID<{},{}>", (self.0).0.connector_id, (self.0).0.u32_suffix) } } impl Debug for ProtoComponentId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "ProtoComponentId({},{})", self.0.controller_id, self.0.u32_suffix) + write!(f, "ProtoComponentId({},{})", self.0.connector_id, self.0.u32_suffix) } } impl std::ops::Not for Polarity { diff --git a/src/lib.rs b/src/lib.rs index 6a544ba2b12d81fd45a96dcadb43feef1823fc9f..59acba88f8ee82541258ec55af625e45c6f22931 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ mod runtime; // #[cfg(test)] // mod test; -pub use common::{ControllerId, Polarity, PortId}; +pub use common::{ConnectorId, Polarity, PortId}; pub use protocol::ProtocolDescription; pub use runtime::{error, Connector, EndpointSetup, FileLogger, VecLogger}; diff --git a/src/macros.rs b/src/macros.rs index bf66dde8d0c64dfd5e7024a99ae3f82e939afe35..1180dc6c40848074665aed5d726b4547eb6ca067 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,33 +1,12 @@ +macro_rules! endptlog { + ($logger:expr, $($arg:tt)*) => {{ + // let w = $logger.line_writer(); + // let _ = write!(w, "[ENDPT]"); + // let _ = writeln!(w, $($arg)*); + }}; +} macro_rules! log { ($logger:expr, $($arg:tt)*) => {{ - let _ = write!($logger.line_writer(), $($arg)*).unwrap(); + let _ = writeln!($logger.line_writer(), $($arg)*); }}; } -// macro_rules! assert_let { -// ($pat:pat = $expr:expr => $work:expr) => { -// if let $pat = $expr { -// $work -// } else { -// panic!("assert_let failed"); -// } -// }; -// } - -// #[test] -// fn assert_let() { -// let x = Some(5); -// let z = assert_let![Some(y) = x => { -// println!("{:?}", y); -// 3 -// }]; -// println!("{:?}", z); -// } - -// #[test] -// #[should_panic] -// fn must_let_panic() { -// let x: Option = None; -// assert_let![Some(y) = x => { -// println!("{:?}", y); -// }]; -// } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 688dae6221ee07a62d7455a4e9c1ee904d357887..b38ef2db7bb7e121b63361fafcced1793eacac0c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -348,7 +348,7 @@ impl Connector { log!(logger, "No decision yet. Let's recv an endpoint msg..."); { let (endpoint_index, msg) = loop { - match endpoint_manager.try_recv_any_comms(deadline)? { + match endpoint_manager.try_recv_any_comms(logger, deadline)? { None => { log!( logger, @@ -422,7 +422,7 @@ impl Connector { } CommMsgContents::Suggest { suggestion } => { // only accept this control msg through a child endpoint - if neighborhood.children.binary_search(&endpoint_index).is_ok() { + if neighborhood.children.contains(&endpoint_index) { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution @@ -492,7 +492,7 @@ impl Connector { } *round_result = match decision { - Decision::Failure => Err(DistributedTimeout), + Decision::Failure => Err(RoundFailure), Decision::Success(predicate) => { // commit changes to component states self.proto_components.clear(); diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 907d7d3813ed2ddba23ed09dba2ff0848904bb78..57e0a05e44819dcbd232f59958c116c54704a2c9 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -13,13 +13,21 @@ enum TryRecyAnyError { ///////////////////// +fn would_block(err: &std::io::Error) -> bool { + err.kind() == std::io::ErrorKind::WouldBlock +} impl Endpoint { - pub fn try_recv(&mut self) -> Result, EndpointError> { + pub fn try_recv( + &mut self, + logger: &mut dyn Logger, + ) -> Result, EndpointError> { use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { - match self.stream.read_to_end(&mut self.inbox) { - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + let res = self.stream.read_to_end(&mut self.inbox); + endptlog!(logger, "Stream read to end {:?}", &res); + match res { + Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(BrokenEndpoint), @@ -48,6 +56,12 @@ impl Endpoint { } impl EndpointManager { + pub fn index_iter(&self) -> Range { + 0..self.num_endpoints() + } + pub fn num_endpoints(&self) -> usize { + self.endpoint_exts.len() + } pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let endpoint = &mut self.endpoint_exts[index].endpoint; endpoint.send(msg).map_err(|err| { @@ -59,22 +73,24 @@ impl EndpointManager { } pub fn try_recv_any_comms( &mut self, + logger: &mut dyn Logger, deadline: Option, ) -> Result, SyncError> { use {SyncError as Se, TryRecyAnyError as Trae}; - match self.try_recv_any(deadline) { + match self.try_recv_any(logger, deadline) { Ok(tup) => Ok(Some(tup)), Err(Trae::Timeout) => Ok(None), Err(Trae::PollFailed) => Err(Se::PollFailed), - Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)), + Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)), } } pub fn try_recv_any_setup( &mut self, + logger: &mut dyn Logger, deadline: Option, ) -> Result<(usize, Msg), ConnectError> { use {ConnectError as Ce, TryRecyAnyError as Trae}; - self.try_recv_any(deadline).map_err(|err| match err { + self.try_recv_any(logger, deadline).map_err(|err| match err { Trae::Timeout => Ce::Timeout, Trae::PollFailed => Ce::PollFailed, Trae::EndpointError { error, index } => Ce::EndpointSetupError( @@ -83,36 +99,56 @@ impl EndpointManager { ), }) } - fn try_recv_any(&mut self, deadline: Option) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError::*; + fn try_recv_any( + &mut self, + logger: &mut dyn Logger, + deadline: Option, + ) -> Result<(usize, Msg), TryRecyAnyError> { + use TryRecyAnyError as Trea; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { + endptlog!(logger, "RECV undelayed_msg {:?}", &x); return Ok(x); } loop { // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; - if let Some(msg) = - endpoint.try_recv().map_err(|error| EndpointError { error, index })? + if let Some(msg) = endpoint + .try_recv(logger) + .map_err(|error| Trea::EndpointError { error, index })? { - if !endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); - } + endptlog!(logger, "RECV polled_undrained {:?}", &msg); + // if !endpoint.inbox.is_empty() { + // there may be another message waiting! + self.polled_undrained.insert(index); + // } return Ok((index, msg)); } } // 3. No message yet. Do we have enough time to poll? let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?) } else { None }; - self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?; + self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); self.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}", + &event, + index, + self.polled_undrained.iter() + ); + if event.is_error() { + return Err(Trea::EndpointError { + error: EndpointError::BrokenEndpoint, + index, + }); + } } self.events.clear(); } diff --git a/src/runtime/error.rs b/src/runtime/error.rs index d6621b4db3a53907076ed481ed8a74f085a88f1d..d2d42f160c290a934c13dad8e6181de94c24f920 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -1,20 +1,32 @@ use crate::common::*; -#[derive(Debug, Clone)] -pub enum EndpointError { - MalformedMessage, - BrokenEndpoint, +#[derive(Debug)] +pub enum ConnectError { + BindFailed(SocketAddr), + PollInitFailed, + Timeout, + PollFailed, + AcceptFailed(SocketAddr), + AlreadyConnected, + PortPeerPolarityMismatch(PortId), + EndpointSetupError(SocketAddr, EndpointError), + SetupAlgMisbehavior, } +//////////////////////// #[derive(Debug, Clone)] pub enum SyncError { - Timeout, NotConnected, InconsistentProtoComponent(ProtoComponentId), IndistinguishableBatches([usize; 2]), - DistributedTimeout, + RoundFailure, PollFailed, BrokenEndpoint(usize), } +#[derive(Debug, Clone)] +pub enum EndpointError { + MalformedMessage, + BrokenEndpoint, +} #[derive(Debug)] pub enum PortOpError { WrongPolarity, @@ -28,19 +40,7 @@ pub enum GottenError { PortDidntGet, PreviousSyncFailed, } - #[derive(Debug, Eq, PartialEq)] pub enum NextBatchError { NotConnected, } -#[derive(Debug)] -pub enum ConnectError { - BindFailed(SocketAddr), - PollInitFailed, - Timeout, - PollFailed, - AcceptFailed(SocketAddr), - AlreadyConnected, - PortPeerPolarityMismatch(PortId), - EndpointSetupError(SocketAddr, EndpointError), -} diff --git a/src/runtime/logging.rs b/src/runtime/logging.rs new file mode 100644 index 0000000000000000000000000000000000000000..8d55670c7b96d21fbb5b9c127d51594dbd7f5a60 --- /dev/null +++ b/src/runtime/logging.rs @@ -0,0 +1,56 @@ +use super::*; + +impl FileLogger { + pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self { + Self(connector_id, file) + } +} +impl VecLogger { + pub fn new(connector_id: ConnectorId) -> Self { + Self(connector_id, Default::default()) + } +} +///////////////// +impl Logger for DummyLogger { + fn line_writer(&mut self) -> &mut dyn std::io::Write { + self + } +} +impl Logger for VecLogger { + fn line_writer(&mut self) -> &mut dyn std::io::Write { + let _ = write!(&mut self.1, "CID({}): ", self.0); + self + } +} +impl Logger for FileLogger { + fn line_writer(&mut self) -> &mut dyn std::io::Write { + let _ = write!(&mut self.1, "CID({}): ", self.0); + &mut self.1 + } +} +/////////////////// +impl Drop for VecLogger { + fn drop(&mut self) { + let stdout = std::io::stderr(); + let mut lock = stdout.lock(); + writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); + let _ = std::io::Write::write(&mut lock, self.1.as_slice()); + } +} +impl std::io::Write for VecLogger { + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } + fn write(&mut self, data: &[u8]) -> Result { + self.1.extend_from_slice(data); + Ok(data.len()) + } +} +impl std::io::Write for DummyLogger { + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } + fn write(&mut self, bytes: &[u8]) -> Result { + Ok(bytes.len()) + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ae8e0ba48fab1c5c3dd2d80e17e4664a42a5b37b..aba2a21d157df5e53f4bf63d3782347ddac4d5db 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,6 +1,7 @@ mod communication; mod endpoints; pub mod error; +mod logging; mod setup; #[cfg(test)] @@ -9,6 +10,11 @@ mod tests; use crate::common::*; use error::*; +#[derive(Debug)] +pub struct VecSet { + // invariant: ordered, deduplicated + vec: Vec, +} #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, @@ -37,8 +43,8 @@ pub enum Msg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum SetupMsg { MyPortInfo(MyPortInfo), - LeaderEcho { maybe_leader: ControllerId }, - LeaderAnnounce { leader: ControllerId }, + LeaderWave { wave_leader: ConnectorId }, + LeaderAnnounce { tree_leader: ConnectorId }, YouAreMyParent, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -78,11 +84,11 @@ pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::io::Write; } #[derive(Debug)] -pub struct VecLogger(ControllerId, Vec); +pub struct VecLogger(ConnectorId, Vec); #[derive(Debug)] pub struct DummyLogger; #[derive(Debug)] -pub struct FileLogger(ControllerId, std::fs::File); +pub struct FileLogger(ConnectorId, std::fs::File); #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, @@ -96,7 +102,7 @@ pub struct EndpointExt { #[derive(Debug)] pub struct Neighborhood { parent: Option, - children: Vec, // ordered, deduplicated + children: VecSet, } #[derive(Debug)] pub struct MemInMsg { @@ -105,7 +111,7 @@ pub struct MemInMsg { } #[derive(Debug)] pub struct IdManager { - controller_id: ControllerId, + connector_id: ConnectorId, port_suffix_stream: U32Stream, proto_component_suffix_stream: U32Stream, } @@ -177,6 +183,19 @@ pub struct SyncProtoContext<'a> { inbox: &'a HashMap, } //////////////// +impl VecSet { + fn iter(&self) -> impl Iterator { + self.vec.iter() + } + fn contains(&self, element: &T) -> bool { + self.vec.binary_search(element).is_ok() + } + fn new(mut vec: Vec) -> Self { + vec.sort(); + vec.dedup(); + Self { vec } + } +} impl PortInfo { fn firing_var_for(&self, port: PortId) -> FiringVar { FiringVar(match self.polarities.get(&port).unwrap() { @@ -186,19 +205,19 @@ impl PortInfo { } } impl IdManager { - fn new(controller_id: ControllerId) -> Self { + fn new(connector_id: ConnectorId) -> Self { Self { - controller_id, + connector_id, port_suffix_stream: Default::default(), proto_component_suffix_stream: Default::default(), } } fn new_port_id(&mut self) -> PortId { - Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into() + Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() } fn new_proto_component_id(&mut self) -> ProtoComponentId { Id { - controller_id: self.controller_id, + connector_id: self.connector_id, u32_suffix: self.proto_component_suffix_stream.next(), } .into() @@ -272,58 +291,6 @@ impl Connector { Ok(()) } } -impl Logger for DummyLogger { - fn line_writer(&mut self) -> &mut dyn std::io::Write { - impl std::io::Write for DummyLogger { - fn flush(&mut self) -> Result<(), std::io::Error> { - Ok(()) - } - fn write(&mut self, bytes: &[u8]) -> Result { - Ok(bytes.len()) - } - } - self - } -} -impl VecLogger { - pub fn new(controller_id: ControllerId) -> Self { - Self(controller_id, Default::default()) - } -} -impl Drop for VecLogger { - fn drop(&mut self) { - let stdout = std::io::stderr(); - let mut lock = stdout.lock(); - writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); - let _ = std::io::Write::write(&mut lock, self.1.as_slice()); - } -} -impl Logger for VecLogger { - fn line_writer(&mut self) -> &mut dyn std::io::Write { - let _ = write!(&mut self.1, "\nCID({}): ", self.0); - self - } -} -impl FileLogger { - pub fn new(controller_id: ControllerId, file: std::fs::File) -> Self { - Self(controller_id, file) - } -} -impl Logger for FileLogger { - fn line_writer(&mut self) -> &mut dyn std::io::Write { - let _ = write!(&mut self.1, "\nCID({}): ", self.0); - &mut self.1 - } -} -impl std::io::Write for VecLogger { - fn flush(&mut self) -> Result<(), std::io::Error> { - Ok(()) - } - fn write(&mut self, data: &[u8]) -> Result { - self.1.extend_from_slice(data); - Ok(data.len()) - } -} impl Predicate { #[inline] pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ae3f1d44608bc347e7930846945eda154243cccf..e5a6c01f91e6d422233f5ff0e4ba21eb171dce01 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -5,24 +5,25 @@ use std::io::ErrorKind::WouldBlock; impl Connector { pub fn new_simple( proto_description: Arc, - controller_id: ControllerId, + connector_id: ConnectorId, ) -> Self { let logger = Box::new(DummyLogger); // let logger = Box::new(DummyLogger); let surplus_sockets = 2; - Self::new(logger, proto_description, controller_id, surplus_sockets) + Self::new(logger, proto_description, connector_id, surplus_sockets) } pub fn new( - logger: Box, + mut logger: Box, proto_description: Arc, - controller_id: ControllerId, + connector_id: ConnectorId, surplus_sockets: u16, ) -> Self { + log!(&mut *logger, "Created with connector_id {:?}", connector_id); Self { proto_description, proto_components: Default::default(), logger, - id_manager: IdManager::new(controller_id), + id_manager: IdManager::new(connector_id), native_ports: Default::default(), port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, @@ -77,7 +78,7 @@ impl Connector { ); // leader election and tree construction let neighborhood = init_neighborhood( - self.id_manager.controller_id, + self.id_manager.connector_id, &mut *self.logger, &mut endpoint_manager, deadline, @@ -215,7 +216,7 @@ fn new_endpoint_manager( *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - let maybe_msg = endpoint.try_recv().map_err(|e| { + let maybe_msg = endpoint.try_recv(logger).map_err(|e| { EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { @@ -260,9 +261,15 @@ fn new_endpoint_manager( } let endpoint_exts = todos .into_iter() - .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt { + .enumerate() + .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt { endpoint: match todo_endpoint { - TodoEndpoint::Endpoint(endpoint) => endpoint, + TodoEndpoint::Endpoint(mut endpoint) => { + poll.registry() + .reregister(&mut endpoint.stream, Token(index), Interest::READABLE) + .unwrap(); + endpoint + } TodoEndpoint::Listener(..) => unreachable!(), }, getter_for_incoming: local_port, @@ -279,137 +286,192 @@ fn new_endpoint_manager( } fn init_neighborhood( - controller_id: ControllerId, + connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, deadline: Option, ) -> Result { - use {Msg::SetupMsg as S, SetupMsg::*}; - log!(logger, "beginning neighborhood construction"); - // 1. broadcast my ID as the first echo. await reply from all neighbors - let echo = S(LeaderEcho { maybe_leader: controller_id }); - let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len()); - for index in 0..em.endpoint_exts.len() { - log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo); - em.send_to_setup(index, &echo)?; - awaiting.insert(index); + use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + //////////////////////////////// + #[derive(Debug)] + struct WaveState { + parent: Option, + leader: ConnectorId, } - - // 2. Receive incoming replies. whenever a higher-id echo arrives, - // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; - let mut my_leader = controller_id; - em.undelay_all(); - 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any_setup(deadline)?; - log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); - match msg { - S(LeaderAnnounce { leader }) => { - // someone else completed the echo and became leader first! - // the sender is my parent - parent = Some(index); - my_leader = leader; - awaiting.clear(); - break 'echo_loop; + fn do_wave( + em: &mut EndpointManager, + awaiting: &mut HashSet, + ws: &WaveState, + ) -> Result<(), ConnectError> { + awaiting.clear(); + let msg = S(LeaderWave { wave_leader: ws.leader }); + for index in em.index_iter() { + if Some(index) != ws.parent { + em.send_to_setup(index, &msg)?; + awaiting.insert(index); } - S(LeaderEcho { maybe_leader }) => { - use Ordering::*; - match maybe_leader.cmp(&my_leader) { - Less => { /* ignore this wave */ } - Equal => { - awaiting.remove(&index); - if awaiting.is_empty() { - if let Some(p) = parent { - // return the echo to my parent - em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?; - } else { - // wave completed! - break 'echo_loop; + } + Ok(()) + } + /////////////////////// + /* + Conceptually, we have two distinct disstributed algorithms back-to-back + 1. Leader election using echo algorithm with extinction. + - Each connector initiates a wave tagged with their ID + - Connectors participate in waves of GREATER ID, abandoning previous waves + - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader + 2. Tree construction + - The leader broadcasts their leadership with msg A + - Upon receiving their first announcement, connectors reply B, and send A to all peers + - A controller exits once they have received A or B from each neighbor + + The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1, + so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun. + + NOTE the distinction between PARENT and LEADER + */ + log!(logger, "beginning neighborhood construction"); + if em.num_endpoints() == 0 { + log!(logger, "Edge case of no neighbors! No parent an no children!"); + return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) }); + } + log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints()); + let mut awaiting = HashSet::with_capacity(em.num_endpoints()); + // 1+ neighbors. Leader can only be learned by receiving messages + // loop ends when I know my sink tree parent (implies leader was elected) + let election_result: WaveState = { + // initially: No parent, I'm the best leader. + let mut best_wave = WaveState { parent: None, leader: connector_id }; + // start a wave for this initial state + do_wave(em, &mut awaiting, &best_wave)?; + // with 1+ neighbors, progress is only made in response to incoming messages + em.undelay_all(); + 'election: loop { + log!(logger, "Election loop. awaiting {:?}...", awaiting.iter()); + let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; + log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + match msg { + S(LeaderAnnounce { tree_leader }) => { + let election_result = + WaveState { leader: tree_leader, parent: Some(recv_index) }; + log!(logger, "Election lost! Result {:?}", &election_result); + assert!(election_result.leader >= best_wave.leader); + assert_ne!(election_result.leader, connector_id); + break 'election election_result; + } + S(LeaderWave { wave_leader }) => { + use Ordering as O; + match wave_leader.cmp(&best_wave.leader) { + O::Less => log!( + logger, + "Ignoring wave with Id {:?}<{:?}", + wave_leader, + best_wave.leader + ), + O::Greater => { + log!( + logger, + "Joining wave with Id {:?}>{:?}", + wave_leader, + best_wave.leader + ); + best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) }; + log!(logger, "New wave state {:?}", &best_wave); + do_wave(em, &mut awaiting, &best_wave)?; + if awaiting.is_empty() { + log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg); + em.send_to_setup(recv_index, &msg)?; } } - } - Greater => { - // join new echo - log!(logger, "Setting leader to index {:?}", index); - parent = Some(index); - my_leader = maybe_leader; - let echo = S(LeaderEcho { maybe_leader: my_leader }); - awaiting.clear(); - if em.endpoint_exts.len() == 1 { - // immediately reply to parent - log!(logger, "replying echo to parent {:?} immediately", index); - em.send_to_setup(index, &echo)?; - } else { - for index2 in 0..em.endpoint_exts.len() { - if index2 == index { - // don't propagate echo to my parent - continue; + O::Equal => { + assert!(awaiting.remove(&recv_index)); + log!( + logger, + "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies", + recv_index, + best_wave.leader, + awaiting.len() + ); + if awaiting.is_empty() { + if let Some(parent) = best_wave.parent { + log!( + logger, + "Sub-wave done! replying to parent {:?} msg {:?}", + parent, + &msg + ); + em.send_to_setup(parent, &msg)?; + } else { + let election_result: WaveState = best_wave; + log!(logger, "Election won! Result {:?}", &election_result); + break 'election election_result; } - log!(logger, "repeating echo {:?} to {:?}", &echo, index2); - em.send_to_setup(index2, &echo)?; - awaiting.insert(index2); } } } } - } - inappropriate_msg => { - log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg); - em.delayed_messages.push((index, inappropriate_msg)) + S(YouAreMyParent) | S(MyPortInfo(_)) => unreachable!(), + comm_msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election algorithm", comm_msg); + em.delayed_messages.push((recv_index, comm_msg)); + } } } - } - match parent { - None => assert_eq!( - my_leader, controller_id, - "I've got no parent, but I consider {:?} the leader?", - my_leader - ), - Some(parent) => assert_ne!( - my_leader, controller_id, - "I have {:?} as parent, but I consider myself ({:?}) the leader?", - parent, controller_id - ), - } - log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader); + }; - // 3. broadcast leader announcement (except to parent: confirm they are your parent) - // in this loop, every node sends 1 message to each neighbor - // await 1 message from all non-parents. - let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for index in 0..em.endpoint_exts.len() { - let msg = if Some(index) == parent { - &S(YouAreMyParent) + // starting algorithm 2. Send a message to every neighbor + log!(logger, "Starting tree construction. Step 1: send one msg per neighbor"); + awaiting.clear(); + for index in em.index_iter() { + if Some(index) == election_result.parent { + em.send_to_setup(index, &S(YouAreMyParent))?; } else { awaiting.insert(index); - &msg_for_non_parents - }; - log!(logger, "ANNOUNCING to {:?} {:?}", index, msg); - em.send_to_setup(index, msg)?; + em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?; + } } - let mut children = Vec::default(); + let mut children = vec![]; em.undelay_all(); while !awaiting.is_empty() { - log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any_setup(deadline)?; + log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter()); + let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; + log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(YouAreMyParent) => { - assert!(awaiting.remove(&index)); - children.push(index); + S(LeaderWave { .. }) => { /* old message */ } + S(LeaderAnnounce { .. }) => { + // not a child + log!( + logger, + "Got reply from non-child index {:?}. Children: {:?}", + recv_index, + children.iter() + ); + if !awaiting.remove(&recv_index) { + return Err(SetupAlgMisbehavior); + } } - S(LeaderAnnounce { leader }) => { - assert!(awaiting.remove(&index)); - assert!(leader == my_leader); - assert!(Some(index) != parent); - // they wouldn't send me this if they considered me their parent + S(YouAreMyParent) => { + if !awaiting.remove(&recv_index) { + log!( + logger, + "Got reply from child index {:?}. Children before... {:?}", + recv_index, + children.iter() + ); + return Err(SetupAlgMisbehavior); + } + children.push(recv_index); } - inappropriate_msg => { - log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg); - em.delayed_messages.push((index, inappropriate_msg)); + S(MyPortInfo(_)) => unreachable!(), + comm_msg @ Msg::CommMsg { .. } => { + log!(logger, "delaying msg {:?} during election algorithm", comm_msg); + em.delayed_messages.push((recv_index, comm_msg)); } } } - children.sort(); - children.dedup(); - Ok(Neighborhood { parent, children }) + children.shrink_to_fit(); + let neighborhood = + Neighborhood { parent: election_result.parent, children: VecSet::new(children) }; + log!(logger, "Neighborhood constructed {:?}", &neighborhood); + Ok(neighborhood) } diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 18f632450344f84481836738522f9316852bb4c6..acc97855e5ec84124298a318401dc59104188d30 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1,6 +1,7 @@ use crate as reowolf; use crossbeam_utils::thread::scope; use reowolf::{ + error::*, Polarity::{Getter, Putter}, *, }; @@ -270,10 +271,10 @@ fn sync_sync() { c.gotten(g1).unwrap(); } -fn file_logged_connector(controller_id: ControllerId, path: &str) -> Connector { +fn file_logged_connector(connector_id: ConnectorId, path: &str) -> Connector { let file = std::fs::File::create(path).unwrap(); - let file_logger = Box::new(FileLogger::new(controller_id, file)); - Connector::new(file_logger, MINIMAL_PROTO.clone(), controller_id, 8) + let file_logger = Box::new(FileLogger::new(connector_id, file)); + Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8) } #[test] @@ -361,3 +362,105 @@ fn distributed_msg_bounce() { }) .unwrap(); } + +#[test] +fn local_timeout() { + let mut c = file_logged_connector(0, "./logs/local_timeout.txt"); + let [_, g] = c.new_port_pair(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.get(g).unwrap(); + match c.sync(Some(Duration::from_millis(200))) { + Err(SyncError::RoundFailure) => {} + res => panic!("expeted timeout. but got {:?}", res), + } +} + +#[test] +fn parent_timeout() { + let sock_addr = next_test_addr(); + scope(|s| { + s.spawn(|_| { + // parent; times out + let mut c = file_logged_connector(999, "./logs/parent_timeout_a.txt"); + let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + }); + s.spawn(|_| { + // child + let mut c = file_logged_connector(000, "./logs/parent_timeout_b.txt"); + let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.get(g).unwrap(); // not matched by put + c.sync(None).unwrap_err(); // no timeout + }); + }) + .unwrap(); +} + +#[test] +fn child_timeout() { + let sock_addr = next_test_addr(); + scope(|s| { + s.spawn(|_| { + // child; times out + let mut c = file_logged_connector(000, "./logs/child_timeout_a.txt"); + let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + }); + s.spawn(|_| { + // parent + let mut c = file_logged_connector(999, "./logs/child_timeout_b.txt"); + let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.get(g).unwrap(); // not matched by put + c.sync(None).unwrap_err(); // no timeout + }); + }) + .unwrap(); +} + +#[test] +fn chain_connect() { + let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, "./logs/chain_connect_a.txt"); + c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: false }) + .unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(2, "./logs/chain_connect_b.txt"); + c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) + .unwrap(); + c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[1], is_active: false }) + .unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, "./logs/chain_connect_c.txt"); + c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) + .unwrap(); + // c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[2], is_active: false }) + // .unwrap(); + c.connect(Some(Duration::from_secs(1))).unwrap(); + }); + // s.spawn(|_| { + // let mut c = file_logged_connector(3, "./logs/chain_connect_d.txt"); + // c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[2], is_active: true }) + // .unwrap(); + // c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[3], is_active: false }) + // .unwrap(); + // c.connect(Some(Duration::from_secs(1))).unwrap(); + // }); + // s.spawn(|_| { + // let mut c = file_logged_connector(4, "./logs/chain_connect_e.txt"); + // c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[3], is_active: true }) + // .unwrap(); + // c.connect(Some(Duration::from_secs(1))).unwrap(); + // }); + }) + .unwrap(); +}