diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 87f4d4f5c6f37d018cfbcf3056c3dc17705d9a79..688dae6221ee07a62d7455a4e9c1ee904d357887 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -101,7 +101,7 @@ impl Connector { } } } - pub fn sync(&mut self, timeout: Duration) -> Result { + pub fn sync(&mut self, timeout: Option) -> Result { use SyncError::*; match &mut self.phased { ConnectorPhased::Setup { .. } => Err(NotConnected), @@ -113,7 +113,7 @@ impl Connector { round_result, .. } => { - let deadline = Instant::now() + timeout; + let mut deadline = timeout.map(|to| Instant::now() + to); let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers log!( @@ -347,8 +347,35 @@ impl Connector { // try recv messages arriving through endpoints log!(logger, "No decision yet. Let's recv an endpoint msg..."); { - let (endpoint_index, msg) = - endpoint_manager.try_recv_any(deadline).unwrap(); + let (endpoint_index, msg) = loop { + match endpoint_manager.try_recv_any_comms(deadline)? { + None => { + log!( + logger, + "Reached user-defined deadling without decision..." + ); + if let Some(parent) = neighborhood.parent { + log!( + logger, + "Sending failure request to parent index {}", + parent + ); + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Suggest { + suggestion: Decision::Failure, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } else { + log!(logger, "As the leader, deciding on timeout"); + break 'undecided Decision::Failure; + } + deadline = None; + } + Some((endpoint_index, msg)) => break (endpoint_index, msg), + } + }; log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); let comm_msg_contents = match msg { Msg::SetupMsg(..) => {