diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index ccf72455f0ae34677f4a8aa8cc8121656fe266c4..8ee6ec0e57d2720082e7561ddf088d414b6e9953 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -46,12 +46,6 @@ struct ProtoComponentBranch { // to miss that they are being setup for `cyclic_drain`. struct CyclicDrainer<'a, K: Eq + Hash, V> { input: &'a mut HashMap, - inner: CyclicDrainerInner<'a, K, V>, -} - -// Inner substructure of the Cyclic drainer to be passed through a callback function. -// See `CyclicDrainer::cyclic_drain`. -struct CyclicDrainerInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } @@ -535,7 +529,7 @@ impl Connector { let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); // initially, no protocol components have .ended==true // drain from branches --> blocked - let cd = CyclicDrainer::new(branches, swap.0, blocked.0); + let cd = CyclicDrainer { input: branches, swap: swap.0, output: blocked.0 }; BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); @@ -798,7 +792,6 @@ impl NativeBranch { } } impl BranchingNative { - // Feed the given payload to the native component // May result in discovering new component solutions, // or fork speculative branches if the message's predicate @@ -983,7 +976,6 @@ impl BranchingNative { } } impl BranchingProtoComponent { - // Create a singleton-branch branching protocol component as // speculation begins, with the given protocol state. fn initial(state: ComponentState) -> Self { @@ -992,7 +984,8 @@ impl BranchingProtoComponent { } // run all the given branches (cd.input) to their SyncBlocker, - // populating cd.output (by way of CyclicDrainer::cyclic_drain). + // populating cd.output by cyclically draining "input" -> "cd."input" / cd.output. + // (to prevent concurrent r/w of one structure, we realize "input" as cd.input for reading and cd.swap for writing) // This procedure might lose branches, and it might create new branches. fn drain_branches_to_blocked( cd: CyclicDrainer, @@ -1000,110 +993,124 @@ impl BranchingProtoComponent { rctx: &mut RoundCtx, proto_component_id: ComponentId, ) -> Result<(), UnrecoverableSyncError> { - cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { - let mut ctx = SyncProtoContext { - rctx, - predicate: &predicate, - branch_inner: &mut branch.inner, - }; - // Run this component's state to the next syncblocker for handling - let blocker = branch.state.sync_run(&mut ctx, cu.proto_description()); - log!( - cu.logger(), - "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", - proto_component_id, - &predicate, - &blocker, - ); - use SyncBlocker as B; - match blocker { - B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency - B::CouldntReadMsg(port) => { - // sanity check: `CouldntReadMsg` returned IFF the message is unavailable - assert!(!branch.inner.inbox.contains_key(&port)); - // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle. - drainer.add_output(predicate, branch); - } - B::CouldntCheckFiring(port) => { - // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned - let var = rctx.ips.port_info.spec_var_for(port); - assert!(predicate.query(var).is_none()); - // speculate on the two possible values of `var`. Schedule both branches to be rerun. - drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); - drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch); - } - B::PutMsg(putter, payload) => { - // sanity check: The given port indeed has `Putter` polarity - assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity); - // assign FIRING to this port's associated firing variable - let var = rctx.ips.port_info.spec_var_for(putter); - let was = predicate.assigned.insert(var, SpecVal::FIRING); - if was == Some(SpecVal::SILENT) { - // Discard the branch, as it clearly has contradictory requirements for this value. - log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", + // let CyclicDrainer { input, swap, output } = cd; + while !cd.input.is_empty() { + 'branch_iter: for (mut predicate, mut branch) in cd.input.drain() { + let mut ctx = SyncProtoContext { + rctx, + predicate: &predicate, + branch_inner: &mut branch.inner, + }; + // Run this component's state to the next syncblocker for handling + let blocker = branch.state.sync_run(&mut ctx, cu.proto_description()); + log!( + cu.logger(), + "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", + proto_component_id, + &predicate, + &blocker, + ); + use SyncBlocker as B; + match blocker { + B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency + B::CouldntReadMsg(port) => { + // sanity check: `CouldntReadMsg` returned IFF the message is unavailable + assert!(!branch.inner.inbox.contains_key(&port)); + // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle. + Self::insert_branch_merging(cd.output, predicate, branch); + } + B::CouldntCheckFiring(port) => { + // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned + let var = rctx.ips.port_info.spec_var_for(port); + assert!(predicate.query(var).is_none()); + // speculate on the two possible values of `var`. Schedule both branches to be rerun. + + Self::insert_branch_merging( + cd.swap, + predicate.clone().inserted(var, SpecVal::SILENT), + branch.clone(), + ); + Self::insert_branch_merging( + cd.swap, + predicate.inserted(var, SpecVal::FIRING), + branch, + ); + } + B::PutMsg(putter, payload) => { + // sanity check: The given port indeed has `Putter` polarity + assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity); + // assign FIRING to this port's associated firing variable + let var = rctx.ips.port_info.spec_var_for(putter); + let was = predicate.assigned.insert(var, SpecVal::FIRING); + if was == Some(SpecVal::SILENT) { + // Discard the branch, as it clearly has contradictory requirements for this value. + log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); - drop((predicate, branch)); - } else { - // Note that this port has put this round, - // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error) - assert!(branch.inner.did_put_or_get.insert(putter)); - log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", + drop((predicate, branch)); + } else { + // Note that this port has put this round, + // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error) + assert!(branch.inner.did_put_or_get.insert(putter)); + log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &predicate, &payload, putter, var); - // Send the given payload (by buffering it). - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - rctx.putter_push(cu, putter, msg); - // Branch can still make progress. Schedule to be rerun - drainer.add_input(predicate, branch); + // Send the given payload (by buffering it). + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + rctx.putter_push(cu, putter, msg); + // Branch can still make progress. Schedule to be rerun + + Self::insert_branch_merging(cd.swap, predicate, branch); + } } - } - B::SyncBlockEnd => { - // This branch reached the end of it's synchronous block - // assign all variables of owned ports that DIDN'T fire to SILENT - for port in rctx.ips.port_info.ports_owned_by(proto_component_id) { - let var = rctx.ips.port_info.spec_var_for(*port); - let actually_exchanged = branch.inner.did_put_or_get.contains(port); - let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); - let speculated_to_fire = val == SpecVal::FIRING; - if actually_exchanged != speculated_to_fire { - log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}", + B::SyncBlockEnd => { + // This branch reached the end of it's synchronous block + // assign all variables of owned ports that DIDN'T fire to SILENT + for port in rctx.ips.port_info.ports_owned_by(proto_component_id) { + let var = rctx.ips.port_info.spec_var_for(*port); + let actually_exchanged = branch.inner.did_put_or_get.contains(port); + let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); + let speculated_to_fire = val == SpecVal::FIRING; + if actually_exchanged != speculated_to_fire { + log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}", port, var, val, actually_exchanged, speculated_to_fire); - // IMPLICIT inconsistency - drop((predicate, branch)); - return Ok(()); + // IMPLICIT inconsistency + drop((predicate, branch)); + continue 'branch_iter; + } } + // submit solution for this component + let subtree_id = SubtreeId::LocalComponent(proto_component_id); + rctx.solution_storage.submit_and_digest_subtree_solution( + cu, + subtree_id, + predicate.clone(), + ); + branch.ended = true; + // This branch exits the cyclic drain + Self::insert_branch_merging(cd.output, predicate, branch); } - // submit solution for this component - let subtree_id = SubtreeId::LocalComponent(proto_component_id); - rctx.solution_storage.submit_and_digest_subtree_solution( - cu, - subtree_id, - predicate.clone(), - ); - branch.ended = true; - // This branch exits the cyclic drain - drainer.add_output(predicate, branch); - } - B::NondetChoice { n } => { - // This branch requested the creation of a new n-way nondeterministic - // fork of the branch with a fresh speculative variable. - // ... allocate a new speculative variable - let var = rctx.spec_var_stream.next(); - // ... and for n distinct values, create a new forked branch, - // and schedule them to be rerun through the cyclic drain. - for val in SpecVal::iter_domain().take(n as usize) { - let pred = predicate.clone().inserted(var, val); - let mut branch_n = branch.clone(); - branch_n.inner.untaken_choice = Some(val.0); - drainer.add_input(pred, branch_n); + B::NondetChoice { n } => { + // This branch requested the creation of a new n-way nondeterministic + // fork of the branch with a fresh speculative variable. + // ... allocate a new speculative variable + let var = rctx.spec_var_stream.next(); + // ... and for n distinct values, create a new forked branch, + // and schedule them to be rerun through the cyclic drain. + for val in SpecVal::iter_domain().take(n as usize) { + let predicate_n = predicate.clone().inserted(var, val); + let mut branch_n = branch.clone(); + branch_n.inner.untaken_choice = Some(val.0); + Self::insert_branch_merging(cd.swap, predicate_n, branch_n); + } } } } - Ok(()) - }) + std::mem::swap(cd.input, cd.swap); + } + Ok(()) } // Feed this branching protocol component the given message, and - // then run all branches until they are once again blocked. + // then run all branches until they are once again blocked. fn feed_msg( &mut self, cu: &mut impl CuUndecided, @@ -1171,7 +1178,7 @@ impl BranchingProtoComponent { log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map - let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); + let cd = CyclicDrainer { input: unblocked.0, swap: swap.0, output: blocked.0 }; BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, &mut self.branches); @@ -1201,10 +1208,10 @@ impl BranchingProtoComponent { for (k, v) in branch.inner.inbox.drain() { old.inner.inbox.insert(k, v); } - old.ended |= branch.ended; } } } + // Given the predicate for the round's solution, collapse this // branching native to an ended branch whose predicate is consistent with it. fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState { @@ -1275,8 +1282,8 @@ impl SolutionStorage { }) } // insert a solution for the given subtree ID, - // AND update new_local to include any solutions that become - // possible as a result of this new addition + // AND update new_local to include any solutions that become + // possible as a result of this new addition pub(crate) fn submit_and_digest_subtree_solution( &mut self, cu: &mut impl CuUndecided, @@ -1340,10 +1347,7 @@ impl NonsyncProtoContext<'_> { pub(crate) fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // Sanity check! The moved ports are owned by this component to begin with for port in moved_ports.iter() { - assert_eq!( - self.proto_component_id, - self.ips.port_info.map.get(port).unwrap().owner - ); + assert_eq!(self.proto_component_id, self.ips.port_info.map.get(port).unwrap().owner); } // Create the new component, and schedule it to be run let new_cid = self.ips.id_manager.new_component_id(); @@ -1435,44 +1439,3 @@ impl SyncProtoContext<'_> { self.branch_inner.untaken_choice.take() } } -impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { - fn new( - input: &'a mut HashMap, - swap: &'a mut HashMap, - output: &'a mut HashMap, - ) -> Self { - Self { input, inner: CyclicDrainerInner { swap, output } } - } - - // This hides the ugliness of facilitating a memory-safe cyclic drain. - // A "drain" would refer to a procedure that empties the input and populates the output. - // It's "cyclic" because the processing function can also populate the input. - // Making this memory safe requires an additional temporary storage, such that - // the input can safely be drained and populated concurrently. - fn cyclic_drain( - self, - mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>, - ) -> Result<(), E> { - let Self { input, inner: CyclicDrainerInner { swap, output } } = self; - while !input.is_empty() { - for (k, v) in input.drain() { - // func is the user-provided callback function, which consumes an element - // as its drained from the input - func(k, v, CyclicDrainerInner { swap, output })? - } - std::mem::swap(input, swap); - } - Ok(()) - } -} -impl<'a, K: Eq + Hash, V> CyclicDrainerInner<'a, K, V> { - // Add this key-value pair to be yielded by the drainer later - fn add_input(&mut self, k: K, v: V) { - self.swap.insert(k, v); - } - - // Add this key-value pair as an output of the drainer - fn add_output(&mut self, k: K, v: V) { - self.output.insert(k, v); - } -}