diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index a0014d465216bf21d512db6b77c654d77a4fb413..9f077a6ac772e7f4bbd149f5f24f2c9d2dd5b2b7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -45,6 +45,16 @@ trait PayloadMsgSender { msg: SendPayloadMsg, ) -> Result<(), SyncError>; } +trait ReplaceBoolTrue { + fn replace_with_true(&mut self) -> bool; +} +impl ReplaceBoolTrue for bool { + fn replace_with_true(&mut self) -> bool { + let was = *self; + *self = true; + !was + } +} //////////////// impl Connector { @@ -137,7 +147,7 @@ impl Connector { timeout: Option, ) -> Result, SyncError> { use SyncError as Se; - let mut deadline = timeout.map(|to| Instant::now() + to); + let deadline = timeout.map(|to| Instant::now() + to); log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -214,22 +224,29 @@ impl Connector { comm.native_batches.len() ); let mut branching_native = BranchingNative { branches: Default::default() }; - for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() { + 'native_branches: for (index, NativeBatch { to_get, to_put }) in + comm.native_batches.drain(..).enumerate() + { let predicate = { let mut predicate = Predicate::default(); - // assign trues + // assign trues for ports that fire + let firing_ports: HashSet = + to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { let var = cu.port_info.firing_var_for(port); predicate.assigned.insert(var, true); } - // assign falses - for &port in cu.native_ports.iter() { + // assign falses for silent ports + for &port in cu.native_ports.difference(&firing_ports) { let var = cu.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); + if let Some(true) = predicate.assigned.insert(var, false) { + log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + continue 'native_branches; + } } predicate }; - log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate); + log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate); // put all messages for (putter, payload) in to_put { @@ -256,6 +273,85 @@ impl Connector { return Err(Se::IndistinguishableBatches([index, existing.index])); } } + let decision = Self::sync_reach_decision( + cu, + comm, + &mut branching_native, + &mut branching_proto_components, + solution_storage, + payloads_to_get, + deadline, + )?; + log!(cu.logger, "Committing to decision {:?}!", &decision); + + // propagate the decision to children + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Announce { decision: decision.clone() }, + }); + log!( + cu.logger, + "Announcing decision {:?} through child endpoints {:?}", + &msg, + &comm.neighborhood.children + ); + for &child in comm.neighborhood.children.iter() { + comm.endpoint_manager.send_to_comms(child, &msg)?; + } + let ret = match decision { + Decision::Failure => { + // dropping {branching_proto_components, branching_native} + Err(Se::RoundFailure) + } + Decision::Success(predicate) => { + // commit changes to component states + cu.proto_components.clear(); + cu.proto_components.extend( + // consume branching proto components + branching_proto_components + .into_iter() + .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + ); + log!( + cu.logger, + "End round with (updated) component states {:?}", + cu.proto_components.keys() + ); + // consume native + Ok(Some(branching_native.collapse_with(&predicate))) + } + }; + log!(cu.logger, "Sync round ending! Cleaning up"); + // dropping {solution_storage, payloads_to_get} + ret + } + + fn sync_reach_decision( + cu: &mut ConnectorUnphased, + comm: &mut ConnectorCommunication, + branching_native: &mut BranchingNative, + branching_proto_components: &mut HashMap, + mut solution_storage: SolutionStorage, + mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>, + mut deadline: Option, + ) -> Result { + let mut already_requested_failure = false; + if branching_native.branches.is_empty() { + log!(cu.logger, "Native starts with no branches! Failure!"); + match comm.neighborhood.parent { + Some(parent) => { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } + } + None => { + log!(cu.logger, "No parent. Deciding on failure"); + return Ok(Decision::Failure); + } + } + } log!(cu.logger, "Done translating native batches into branches"); comm.native_batches.push(Default::default()); @@ -281,12 +377,25 @@ impl Connector { )?; // swap the blocked branches back std::mem::swap(&mut blocked, branches); + if branches.is_empty() { + log!(cu.logger, "{:?} has become inconsistent!", proto_component_id); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); + } + } } log!(cu.logger, "All proto components are blocked"); log!(cu.logger, "Entering decision loop..."); comm.endpoint_manager.undelay_all(); - let decision = 'undecided: loop { + 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); @@ -327,6 +436,23 @@ impl Connector { getter, &send_payload_msg, )?; + if branching_component.branches.is_empty() { + log!( + cu.logger, + "{:?} has become inconsistent!", + proto_component_id + ); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); + } + } } else { log!( cu.logger, @@ -356,7 +482,7 @@ impl Connector { } None => { log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); - break 'undecided Decision::Success(solution); + return Ok(Decision::Success(solution)); } } } @@ -370,21 +496,14 @@ impl Connector { None => { log!(cu.logger, "Reached user-defined deadling without decision..."); if let Some(parent) = comm.neighborhood.parent { - log!( - cu.logger, - "Sending failure request to parent index {}", - parent - ); - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::Suggest { - suggestion: Decision::Failure, - }, - }); - comm.endpoint_manager.send_to_comms(parent, &msg)?; + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } } else { log!(cu.logger, "As the leader, deciding on timeout"); - break 'undecided Decision::Failure; + return Ok(Decision::Failure); } deadline = None; } @@ -452,21 +571,16 @@ impl Connector { Decision::Failure => { match comm.neighborhood.parent { None => { - log!( - cu.logger, - "As sink, I decide on my child's failure" - ); - // I am the sink. Decide on failed - break 'undecided Decision::Failure; + log!(cu.logger, "I decide on my child's failure"); + break 'undecided Ok(Decision::Failure); } Some(parent) => { log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent); - // I've got a parent. Forward the failure suggestion. - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::Suggest { suggestion }, - }); - comm.endpoint_manager.send_to_comms(parent, &msg)?; + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger, "Already requested failure"); + } } } } @@ -483,7 +597,7 @@ impl Connector { CommMsgContents::Announce { decision } => { if Some(endpoint_index) == comm.neighborhood.parent { // adopt this decision - break 'undecided decision; + return Ok(decision); } else { log!( cu.logger, @@ -496,36 +610,20 @@ impl Connector { } } log!(cu.logger, "Endpoint msg recv done"); - }; - log!(cu.logger, "Committing to decision {:?}!", &decision); - - // propagate the decision to children + } + } + fn request_failure( + cu: &mut ConnectorUnphased, + comm: &mut ConnectorCommunication, + parent: usize, + ) -> Result<(), SyncError> { + log!(cu.logger, "Forwarding to my parent {:?}", parent); + let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Announce { decision: decision.clone() }, + contents: CommMsgContents::Suggest { suggestion }, }); - log!( - cu.logger, - "Announcing decision {:?} through child endpoints {:?}", - &msg, - &comm.neighborhood.children - ); - for &child in comm.neighborhood.children.iter() { - comm.endpoint_manager.send_to_comms(child, &msg)?; - } - match decision { - Decision::Failure => Err(Se::RoundFailure), - Decision::Success(predicate) => { - // commit changes to component states - cu.proto_components.clear(); - cu.proto_components.extend( - branching_proto_components - .into_iter() - .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), - ); - Ok(Some(branching_native.collapse_with(&predicate))) - } - } + comm.endpoint_manager.send_to_comms(parent, &msg) } } impl BranchingNative { @@ -624,7 +722,6 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } - // |putter, m| { // let getter = *cu.port_info.peers.get(&putter).unwrap(); // payloads_to_get.push((getter, m));