diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 6991a81de7c6923037d2b8f02bba06c84ee58cca..6df77b1bb4258c18b5b0bde62ac79e5d09c250ff 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,40 +2,21 @@ use crate::common::*; use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { - //usable BETWEEN rounds - fn consistent(&self) -> bool { - self.inner.mono_n.is_some() - } - - fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { + fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - // let mut table_row = HashMap::::default(); - - let n_pair = { - let poly_n = self.ephemeral.poly_n.take().unwrap(); - let mono_n = poly_n.choose_mono(&decision).unwrap(); - (mono_n, poly_n) - }; - self.inner.mono_n = Some(n_pair.0.clone()); - - let p_pairs = self - .ephemeral - .poly_ps - .drain(..) - .map(|poly_p| { - let mono_p = poly_p.choose_mono(&decision).unwrap(); - (mono_p, poly_p) - }) - .collect::>() - .into_boxed_slice(); - self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone())); - self.round_histories.push(RoundHistory::Consistent { - decision: decision.clone(), - native_component: n_pair, - protocol_components: p_pairs, - }); - let announcement = - CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); + if let Decision::Success(predicate) = &decision { + // overwrite MonoN/P + self.inner.mono_n = + self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap(); + self.inner.mono_ps.clear(); + self.inner.mono_ps.extend( + self.ephemeral + .poly_ps + .drain(..) + .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), + ) + } + let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { log!( &mut self.inner.logger, @@ -71,8 +52,9 @@ impl Controller { } else { // I have no parent -> I'm the leader assert!(self.inner.family.parent_ekey.is_none()); - let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next(); - Ok(if let Some(decision) = maybe_decision { + let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); + Ok(if let Some(predicate) = maybe_predicate { + let decision = Decision::Success(predicate); log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); self.end_round_with_decision(decision)?; true @@ -86,7 +68,7 @@ impl Controller { &mut self, sync_batches: impl Iterator, ) -> Result { - let MonoN { ekeys, .. } = self.inner.mono_n.take().unwrap(); + let MonoN { ekeys, .. } = self.inner.mono_n.clone(); let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; let mut branches = HashMap::<_, _>::default(); for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { @@ -143,70 +125,30 @@ impl Controller { Ok(PolyN { ekeys, branches }) } - pub fn sync_round( - &mut self, - deadline: Instant, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - if self.consistent() { - match self.sync_round_inner(deadline, sync_batches) { - Ok(()) => return Ok(()), - Err(error) => { - log!( - &mut self.inner.logger, - "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...", - ); - let h = RoundHistory::Inconsistent { - error, - subtree_solutions: std::mem::take(&mut self.ephemeral.solution_storage), - native_component: self.ephemeral.poly_n.take().unwrap(), - protocol_components: std::mem::take(&mut self.ephemeral.poly_ps) - .into_boxed_slice(), - }; - self.round_histories.push(h); - for (round_index, round) in self.round_histories.iter().enumerate() { - log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round); - } - } - } - } - if let Some(RoundHistory::Inconsistent { error, .. }) = - self.round_histories.iter().rev().next() - { - Err(error.clone()) - } else { - unreachable!() - } - } - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. - fn sync_round_inner( + pub fn sync_round( &mut self, deadline: Instant, sync_batches: Option>, ) -> Result<(), SyncErr> { - assert!(self.ephemeral.is_clear()); - log!( &mut self.inner.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.inner.round_index ); + assert!(self.ephemeral.is_clear()); // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // Some actors are dropped. some new actors are created. // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len()); - self.ephemeral.poly_ps.clear(); - // let mut poly_ps: Vec = vec![]; - while let Some(mut mono_p) = self.inner.mono_ps.pop() { + self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); + while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { let mut m_ctx = MonoPContext { ekeys: &mut mono_p.ekeys, + mono_ps: &mut self.ephemeral.mono_ps, inner: &mut self.inner, - // endpoint_exts: &mut self.endpoint_exts, - // mono_ps: &mut self.mono_ps, - // channel_id_stream: &mut self.channel_id_stream, }; // cross boundary into crate::protocol let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); @@ -229,7 +171,7 @@ impl Controller { // TODO: store and update this mapping rather than rebuilding it each round. let ekey_to_holder: HashMap = { use PolyId::*; - let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N))); + let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N)); let p = self .ephemeral .poly_ps @@ -247,7 +189,7 @@ impl Controller { // 4. Create the solution storage. it tracks the solutions of "subtrees" // of the controller in the overlay tree. self.ephemeral.solution_storage.reset({ - let n = self.inner.mono_n.iter().map(|_| SubtreeId::PolyN); + let n = std::iter::once(SubtreeId::PolyN); let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); let c = self .inner @@ -267,7 +209,6 @@ impl Controller { // 5. kick off the synchronous round of the native actor if it exists log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); - assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // using if let because of nested ? operator // TODO check that there are 1+ branches or NO SOLUTION @@ -361,6 +302,27 @@ impl Controller { } }; match current_content { + CommMsgContents::Failure => match self.inner.family.parent_ekey { + Some(parent_ekey) => { + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with ekey {:?}", + &announcement, + parent_ekey + ); + self.inner + .endpoint_exts + .get_mut(parent_ekey) + .expect("ss") + .endpoint + .send(announcement.clone())?; + } + None => return self.end_round_with_decision(Decision::Failure), + }, CommMsgContents::Elaborate { partial_oracle } => { // Child controller submitted a subtree solution. if !self.inner.family.children_ekeys.contains(&received.recipient) { @@ -382,7 +344,7 @@ impl Controller { return Ok(()); } } - CommMsgContents::Announce { oracle } => { + CommMsgContents::Announce { decision } => { if self.inner.family.parent_ekey != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } @@ -390,9 +352,9 @@ impl Controller { &mut self.inner.logger, "Received ANNOUNCEMENT from from parent {:?}: {:?}", received.recipient, - &oracle + &decision ); - return self.end_round_with_decision(oracle); + return self.end_round_with_decision(decision); } CommMsgContents::SendPayload { payload_predicate, payload } => { assert_eq!( @@ -489,6 +451,25 @@ impl Controller { } } } + // 'timeout_loop: loop { + // log!(&mut self.inner.logger, "`POLLING (already timed out)`..."); + // let received = self.recv_blocking()?; + // log!(&mut self.inner.logger, "::: message {:?}...", &received); + // let current_content = match received.msg { + // Msg::SetupMsg(_) => { + // // This occurs in the event the connector was malformed during connect() + // return Err(SyncErr::UnexpectedSetupMsg); + // } + // Msg::CommMsg(CommMsg { round_index, contents }) => { + // if round_index > self.inner.round_index { + // self.delay(received); + // continue 'timeout_loop; + // } else { + // contents + // } + // } + // }; + // } } } impl ControllerEphemeral { @@ -496,6 +477,7 @@ impl ControllerEphemeral { self.solution_storage.is_clear() && self.poly_n.is_none() && self.poly_ps.is_empty() + && self.mono_ps.is_empty() && self.ekey_to_holder.is_empty() } fn clear(&mut self) { @@ -540,7 +522,7 @@ impl MonoContext for MonoPContext<'_> { ); if moved_ekeys.is_subset(self.ekeys) { self.ekeys.retain(|x| !moved_ekeys.contains(x)); - self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); + self.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); } else { panic!("MachineP attempting to move alien ekey!"); } @@ -703,3 +685,9 @@ impl PolyContext for BranchPContext<'_, '_> { val } } + +/* +invariant: Controller.inner has stable MonoN/P states for which it will start the + + +*/