diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 442a458f12552ea7ff28ca8447569f367c4a66b4..fca0d448433ad0e7069b21e6885e6b124f32708e 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -264,18 +264,30 @@ impl Controller { return Ok(()); } - // 4. Receive incoming messages until the DECISION is made + // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); let received = match deadline { + None => { + // we have personally timed out. perform a "long" poll. + self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") + } Some(d) => match self.recv(d)? { + // we have not yet timed out. performed a time-limited poll Some(received) => received, None => { + // timed out! send a FAILURE message to the sink, + // and henceforth don't time out on polling. deadline = None; match self.inner.family.parent_ekey { + None => { + // I am the sink! announce failure and return. + return self.end_round_with_decision(Decision::Failure); + } Some(parent_ekey) => { + // I am not the sink! send a failure message. let announcement = Msg::CommMsg(CommMsg { round_index: self.inner.round_index, contents: CommMsgContents::Failure, @@ -292,13 +304,11 @@ impl Controller { .expect("ss") .endpoint .send(announcement.clone())?; + continue; // poll some more } - None => return self.end_round_with_decision(Decision::Failure), } - continue; } }, - None => self.recv(Instant::now() + Duration::from_secs(2))?.expect("DRIED UP"), }; log!(&mut self.inner.logger, "::: message {:?}...", &received); let current_content = match received.msg { @@ -698,9 +708,3 @@ impl PolyContext for BranchPContext<'_, '_> { val } } - -/* -invariant: Controller.inner has stable MonoN/P states for which it will start the - - -*/ diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 4bf276b09662c2af5fcc5e71155123b3a76ff550..b48ed160dab9ad4fd9969b06da91b93878f030e5 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -95,27 +95,12 @@ struct ChannelIdStream { next_channel_index: ChannelIndex, } -#[derive(Debug)] -enum RoundHistory { - Consistent { - decision: Predicate, - native_component: (MonoN, PolyN), - protocol_components: Box<[(MonoP, PolyP)]>, - }, - Inconsistent { - error: SyncErr, - subtree_solutions: SolutionStorage, - native_component: PolyN, - protocol_components: Box<[PolyP]>, - }, -} - #[derive(Debug)] struct Controller { protocol_description: Arc, inner: ControllerInner, ephemeral: ControllerEphemeral, - round_histories: Vec, + unrecoverable_error: Option, // prevents future calls to Sync } #[derive(Debug)] struct ControllerInner { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 1d0516e743654f6d1e34fad887f10284e989a62f..b7be6c2a3c8f465642694be7141ba3e910a8b6b0 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -131,7 +131,8 @@ impl Controller { protocol_description, inner, ephemeral: Default::default(), - round_histories: vec![], + // round_histories: vec![], + unrecoverable_error: None, }; Ok((controller, native_interface)) }