diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index a9971f53b15c21af0e7fd62fcc3d2fa83f32a875..85ba76f4aa297db6778236b802b8b75234439a98 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,34 +2,33 @@ use crate::common::*; use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { + //usable BETWEEN rounds + fn consistent(&self) -> bool { + self.inner.mono_n.is_some() + } + fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - let mut table_row = HashMap::::default(); - // 1. become_mono for Poly actors - self.inner.mono_n = - self.ephemeral.poly_n.take().map(|poly_n| { - poly_n.become_mono(&mut self.inner.logger, &decision, &mut table_row) - }); - self.inner.mono_ps.extend( - self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)), - ); + // let mut table_row = HashMap::::default(); - // convert (Key=>Payload) map to (ChannelId=>Payload) map. - let table_row: HashMap<_, _> = table_row - .into_iter() - .map(|(ekey, msg)| { - let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - (channel_id, msg) + let n_pair = { + let poly_n = self.ephemeral.poly_n.take().unwrap(); + let mono_n = poly_n.choose_mono(&decision).unwrap(); + (mono_n, poly_n) + }; + self.inner.mono_n = Some(n_pair.0.clone()); + + let p_pairs = self + .ephemeral + .poly_ps + .drain(..) + .map(|poly_p| { + let mono_p = poly_p.choose_mono(&decision).unwrap(); + (mono_p, poly_p) }) - .collect(); - // log all firing ports - for (channel_id, payload) in table_row { - log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload); - } - // log all silent ports - for channel_id in decision.iter_matching(false) { - log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id); - } + .collect::>(); + self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone())); + self.round_histories.push(RoundHistory::Consistent(decision.clone(), n_pair, p_pairs)); let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { @@ -139,15 +138,44 @@ impl Controller { Ok(PolyN { ekeys, branches }) } - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. - // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. pub fn sync_round( &mut self, deadline: Instant, sync_batches: Option>, ) -> Result<(), SyncErr> { - // TODO! fuse handle_locals_return_decision and end_round_return_decision + if !self.consistent() { + // was previously inconsistent + return Err(SyncErr::Inconsistent); + } + match self.sync_round_inner(deadline, sync_batches) { + Ok(()) => Ok(()), + Err(e) => { + log!( + &mut self.inner.logger, + "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...", + ); + let h = RoundHistory::Inconsistent( + std::mem::take(&mut self.ephemeral.solution_storage), + self.ephemeral.poly_n.take().unwrap(), + std::mem::take(&mut self.ephemeral.poly_ps), + ); + self.round_histories.push(h); + for (round_index, round) in self.round_histories.iter().enumerate() { + log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round); + } + Err(e) + } + } + } + + // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. + // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. + fn sync_round_inner( + &mut self, + deadline: Instant, + sync_batches: Option>, + ) -> Result<(), SyncErr> { assert!(self.ephemeral.is_clear()); log!( @@ -287,11 +315,7 @@ impl Controller { 'recv_loop: loop { log!(&mut self.inner.logger, "`POLLING`..."); let received = self.recv(deadline)?.ok_or_else(|| { - log!( - &mut self.inner.logger, - ":( timing out. Solutions storage in state... {:#?}", - &self.ephemeral.solution_storage - ); + log!(&mut self.inner.logger, ":( timing out"); SyncErr::Timeout })?; log!(&mut self.inner.logger, "::: message {:?}...", &received); @@ -481,6 +505,7 @@ impl Into for MonoP { state: self.state, inbox: Default::default(), outbox: Default::default(), + blocking_on: None, } }, ekeys: self.ekeys,