diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 6df77b1bb4258c18b5b0bde62ac79e5d09c250ff..442a458f12552ea7ff28ca8447569f367c4a66b4 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -4,18 +4,22 @@ use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - if let Decision::Success(predicate) = &decision { - // overwrite MonoN/P - self.inner.mono_n = - self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap(); - self.inner.mono_ps.clear(); - self.inner.mono_ps.extend( - self.ephemeral - .poly_ps - .drain(..) - .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), - ) - } + let ret = match &decision { + Decision::Success(predicate) => { + // overwrite MonoN/P + self.inner.mono_n = + self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap(); + self.inner.mono_ps.clear(); + self.inner.mono_ps.extend( + self.ephemeral + .poly_ps + .drain(..) + .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), + ); + Ok(()) + } + Decision::Failure => Err(SyncErr::Timeout), + }; let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { log!( @@ -33,7 +37,7 @@ impl Controller { } self.inner.round_index += 1; self.ephemeral.clear(); - Ok(()) + ret } // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found @@ -129,7 +133,7 @@ impl Controller { // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. pub fn sync_round( &mut self, - deadline: Instant, + mut deadline: Option, sync_batches: Option>, ) -> Result<(), SyncErr> { log!( @@ -264,15 +268,43 @@ impl Controller { log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { - log!(&mut self.inner.logger, "`POLLING`..."); - let received = self.recv(deadline)?.ok_or_else(|| { - log!(&mut self.inner.logger, ":( timing out"); - SyncErr::Timeout - })?; + log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); + let received = match deadline { + Some(d) => match self.recv(d)? { + Some(received) => received, + None => { + deadline = None; + match self.inner.family.parent_ekey { + Some(parent_ekey) => { + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with ekey {:?}", + &announcement, + parent_ekey + ); + self.inner + .endpoint_exts + .get_mut(parent_ekey) + .expect("ss") + .endpoint + .send(announcement.clone())?; + } + None => return self.end_round_with_decision(Decision::Failure), + } + continue; + } + }, + None => self.recv(Instant::now() + Duration::from_secs(2))?.expect("DRIED UP"), + }; log!(&mut self.inner.logger, "::: message {:?}...", &received); let current_content = match received.msg { - Msg::SetupMsg(_) => { + Msg::SetupMsg(s) => { // This occurs in the event the connector was malformed during connect() + println!("WASNT EXPECTING {:?}", s); return Err(SyncErr::UnexpectedSetupMsg); } Msg::CommMsg(CommMsg { round_index, .. }) @@ -451,25 +483,6 @@ impl Controller { } } } - // 'timeout_loop: loop { - // log!(&mut self.inner.logger, "`POLLING (already timed out)`..."); - // let received = self.recv_blocking()?; - // log!(&mut self.inner.logger, "::: message {:?}...", &received); - // let current_content = match received.msg { - // Msg::SetupMsg(_) => { - // // This occurs in the event the connector was malformed during connect() - // return Err(SyncErr::UnexpectedSetupMsg); - // } - // Msg::CommMsg(CommMsg { round_index, contents }) => { - // if round_index > self.inner.round_index { - // self.delay(received); - // continue 'timeout_loop; - // } else { - // contents - // } - // } - // }; - // } } } impl ControllerEphemeral {