diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 14c75a604c8017ba623579e7f3f7a2157859ac16..140d03c3b079b308cfb9eb49ba0c38f77ffe7da9 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -241,7 +241,7 @@ impl Connector { cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, timeout: Option, - ) -> Result, SyncError> { + ) -> Result, SyncError> { ////////////////////////////////// use SyncError as Se; ////////////////////////////////// @@ -378,11 +378,7 @@ impl Connector { predicate.assigned.insert(var, SpecVal::FIRING); } // all silent ports have SpecVal::SILENT - for (port, port_info) in cu.current_state.port_info.iter() { - if port_info.owner != cu.native_component_id { - // not my port - continue; - } + for port in cu.current_state.ports_owned_by(cu.native_component_id) { if firing_ports.contains(port) { // this one is FIRING continue; @@ -433,7 +429,8 @@ impl Connector { } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); - // Call to another big method; keep running this round until a distributed decision is reached + // Call to another big method; keep running this round + // until a distributed decision is reached! log!(cu.logger(), "Searching for decision..."); log!(@BENCH, cu.logger(), ""); let decision = Self::sync_reach_decision( @@ -467,18 +464,19 @@ impl Connector { } let ret = match decision { Decision::Failure => { - // dropping {branching_proto_components, branching_native} - log!(cu.logger, "Failure with {:#?}", &rctx.solution_storage); + // untouched port/component fields of `cu` are NOT overwritten. + // the result is a rollback. Err(Se::RoundFailure) } Decision::Success(predicate) => { // commit changes to component states cu.proto_components.clear(); cu.proto_components.extend( - // consume branching proto components + // "flatten" branching components, committing the speculation + // consistent with the predicate decided upon. branching_proto_components .into_iter() - .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))), ); // commit changes to ports and id_manager cu.current_state = rctx.current_state; @@ -488,7 +486,8 @@ impl Connector { cu.proto_components.keys() ); // consume native - Ok(Some(branching_native.collapse_with(&mut *cu.logger(), &predicate))) + let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate); + Ok(Some(round_ok)) } }; log!(cu.logger(), "Sync round ending! Cleaning up"); @@ -503,6 +502,7 @@ impl Connector { branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, ) -> Result { + // The round is in progress, and now its just a matter of arriving at a decision. log!(@MARK, cu.logger(), "decide start"); let mut already_requested_failure = false; if branching_native.branches.is_empty() { @@ -529,7 +529,9 @@ impl Connector { let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); let mut bn_temp_owner = >::default(); - // run all proto components to their sync blocker + // first, we run every protocol component to their sync blocker. + // Afterwards we establish a loop invariant: no new decision can be reached + // without handling messages in the buffer or arriving from the network log!( cu.logger(), "Running all {} proto components to their sync blocker...", @@ -540,7 +542,7 @@ impl Connector { // must reborrow to constrain the lifetime of pcb_temps to inside the loop let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut(); let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); - // initially, no components have .ended==true + // initially, no protocol components have .ended==true // drain from branches --> blocked let cd = CyclicDrainer::new(branches, swap.0, blocked.0); BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; @@ -561,17 +563,18 @@ impl Connector { } } log!(cu.logger(), "All proto components are blocked"); + // ...invariant established! log!(cu.logger(), "Entering decision loop..."); comm.endpoint_manager.undelay_all(); 'undecided: loop { - // drain payloads_to_get, sending them through endpoints / feeding them to components + // handle all buffered messages, sending them through endpoints / feeding them to components log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len()); while let Some((getter, send_payload_msg)) = rctx.getter_pop() { log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); let getter_info = rctx.current_state.port_info.get(&getter).unwrap(); - let cid = getter_info.owner; - assert_eq!(Getter, getter_info.polarity); + let cid = getter_info.owner; // the id of the component owning `getter` port + assert_eq!(Getter, getter_info.polarity); // sanity check log!( cu.logger(), "Routing msg {:?} to {:?} via {:?}", @@ -581,18 +584,23 @@ impl Connector { ); match getter_info.route { Route::UdpEndpoint { index } => { + // this is a message sent over the network through a UDP endpoint let udp_endpoint_ext = &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; log!(cu.logger(), "Delivering to udp endpoint index={}", index); + // UDP mediator messages are buffered until the end of the round, + // because they are still speculative udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Route::NetEndpoint { index } => { + // this is a message sent over the network as a control message log!(@MARK, cu.logger(), "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); + // actually send the message now comm.endpoint_manager.send_to_comms(index, &msg)?; } Route::LocalComponent if cid == cu.native_component_id() => branching_native @@ -604,8 +612,11 @@ impl Connector { MapTempGuard::new(&mut bn_temp_owner), ), Route::LocalComponent => { + // some other component_id routed locally. must be a protocol component! if let Some(branching_component) = branching_proto_components.get_mut(&cid) { + // The recipient component is still running! + // Feed it this message AND run it again until all branches are blocked branching_component.feed_msg( cu, rctx, @@ -615,6 +626,8 @@ impl Connector { pcb_temps.reborrow(), )?; if branching_component.branches.is_empty() { + // A solution is impossible! this component has zero branches + // Initiate a rollback log!(cu.logger(), "{:?} has become inconsistent!", cid); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { @@ -628,6 +641,10 @@ impl Connector { } } } else { + // This case occurs when the component owning `getter` has exited, + // but the putter is still running (and sent this message). + // we drop the message on the floor, because it cannot be involved + // in a solution (requires sending a message over a dead channel!). log!( cu.logger(), "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", @@ -639,7 +656,7 @@ impl Connector { } } } - + // payload buffer is empty. // check if we have a solution yet log!(cu.logger(), "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { @@ -647,6 +664,8 @@ impl Connector { log!(@MARK, cu.logger(), "local solution"); match comm.neighborhood.parent { Some(parent) => { + // Always forward connector-local solutions to my parent + // AS they are moved from new->old in solution storage. log!(cu.logger(), "Forwarding to my parent {:?}", parent); let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { @@ -665,13 +684,20 @@ impl Connector { } // stuck! make progress by receiving a msg - // try recv messages arriving through endpoints + // try recv ONE message arriving through an endpoint log!(cu.logger(), "No decision yet. Let's recv an endpoint msg..."); { + // This is the first call that may block the thread! + // Until a message arrives over the network, no new solutions are possible. let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm.endpoint_manager.try_recv_any_comms(cu, rctx, comm.round_index)? { CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), - CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::NewPayloadMsgs => { + // 1+ speculative payloads have been buffered + // but no other control messages that require further handling + // restart the loop to process the messages before blocking + continue 'undecided; + } CommRecvOk::TimeoutWithoutNew => { log!(cu.logger(), "Reached user-defined deadling without decision..."); if let Some(parent) = comm.neighborhood.parent { @@ -684,19 +710,23 @@ impl Connector { log!(cu.logger(), "As the leader, deciding on timeout"); return Ok(Decision::Failure); } + // disable future timeout events! our request for failure has been sent + // all we can do at this point is wait. rctx.deadline = None; continue 'undecided; } }; + // We received a control message that requires further action log!( cu.logger(), - "Received from endpoint {} ctrl msg {:?}", + "Received from endpoint {} ctrl msg {:?}", net_index, &comm_ctrl_msg ); match comm_ctrl_msg { CommCtrlMsg::Suggest { suggestion } => { - // only accept this control msg through a child endpoint + // We receive the solution of another connector (part of the decision process) + // (only accept this through a child endpoint) if comm.neighborhood.children.contains(&net_index) { match suggestion { Decision::Success(predicate) => { @@ -708,6 +738,7 @@ impl Connector { ); } Decision::Failure => { + // Someone timed out! propagate this to parent or decide match comm.neighborhood.parent { None => { log!(cu.logger(), "I decide on my child's failure"); @@ -725,6 +756,9 @@ impl Connector { } } } else { + // Unreachable if all connectors are playing by the rules. + // Silently ignored instead of causing panic to make the + // runtime more robust against network fuzz log!( cu.logger(), "Discarding suggestion {:?} from non-child endpoint idx {:?}", @@ -734,10 +768,13 @@ impl Connector { } } CommCtrlMsg::Announce { decision } => { + // Apparently this round is over! A decision has been reached if Some(net_index) == comm.neighborhood.parent { - // adopt this decision + // We accept the decision because it comes from our parent. + // end this loop, and and the synchronous round return Ok(decision); } else { + // Again, unreachable if all connectors are playing by the rules log!( cu.logger(), "Discarding announcement {:?} from non-parent endpoint idx {:?}", @@ -751,6 +788,8 @@ impl Connector { log!(cu.logger(), "Endpoint msg recv done"); } } + + // Send a failure request to my parent in the consensus tree fn request_failure( cu: &mut impl CuUndecided, comm: &mut ConnectorCommunication, @@ -771,6 +810,10 @@ impl NativeBranch { } } impl BranchingNative { + // Feed the given payload to the native component + // May result in discovering new component solutions, + // or fork speculative branches if the message's predicate + // is MORE SPECIFIC than the branches of the native fn feed_msg( &mut self, cu: &mut impl CuUndecided, @@ -784,15 +827,36 @@ impl BranchingNative { let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); + // Visit all native's branches, and feed those whose current predicates are + // consistent with that of the received message. for (predicate, mut branch) in draining.drain() { log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate); - // check if this branch expects to receive it let var = rctx.current_state.spec_var_for(getter); + if predicate.query(var) != Some(SpecVal::FIRING) { + // optimization. Don't bother trying this branch, + // because the resulting branch would have an inconsistent predicate. + // the existing branch asserts the getter port is SILENT + log!( + cu.logger(), + "skipping branch with {:?} that doesn't want the message (fastpath)", + &predicate + ); + Self::insert_branch_merging(finished, predicate, branch); + continue; + } + // Define a little helper closure over `rctx` + // for feeding the given branch this new payload, + // and submitting any resulting solutions let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { + // This branch notes the getter port as "gotten" branch.to_get.remove(&getter); - let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); - assert!(was.is_none()); + if let Some(was) = branch.gotten.insert(getter, send_payload_msg.payload.clone()) { + // Sanity check. Payload mapping (Predicate,Port) should be unique each round + assert_eq!(&was, &send_payload_msg.payload); + } if branch.is_ended() { + // That was the last message the branch was awaiting! + // Submitting new component solution. log!( cu.logger(), "new native solution with {:?} is_ended() with gotten {:?}", @@ -806,6 +870,7 @@ impl BranchingNative { predicate.clone(), ); } else { + // This branch still has ports awaiting their messages log!( cu.logger(), "Fed native {:?} still has to_get {:?}", @@ -814,20 +879,11 @@ impl BranchingNative { ); } }; - if predicate.query(var) != Some(SpecVal::FIRING) { - // optimization. Don't bother trying this branch - log!( - cu.logger(), - "skipping branch with {:?} that doesn't want the message (fastpath)", - &predicate - ); - Self::insert_branch_merging(finished, predicate, branch); - continue; - } use AssignmentUnionResult as Aur; match predicate.assignment_union(&send_payload_msg.predicate) { Aur::Nonexistant => { - // this branch does not receive the message + // The predicates of this branch and the payload are incompatible + // retain this branch as-is log!( cu.logger(), "skipping branch with {:?} that doesn't want the message (slowpath)", @@ -836,13 +892,17 @@ impl BranchingNative { Self::insert_branch_merging(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { - // retain the existing predicate, but add this payload + // The branch's existing predicate "covers" (is at least as specific) + // as that of the payload. Can feed this branch the message without altering + // the branch predicate. feed_branch(&mut branch, &predicate); log!(cu.logger(), "branch pred covers it! Accept the msg"); Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { - // fork branch, give fork the message and payload predicate. original branch untouched + // The predicates of branch and payload are compatible, + // but that of the payload is strictly more specific than that of the latter. + // FORK the branch, feed the fork the message, and give it the payload's predicate. let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); @@ -856,7 +916,9 @@ impl BranchingNative { Self::insert_branch_merging(finished, predicate2, branch2); } Aur::New(predicate2) => { - // fork branch, give fork the message and the new predicate. original branch untouched + // The predicates of branch and payload are compatible, + // but their union is some new predicate (both preds assign something new). + // FORK the branch, feed the fork the message, and give it the new predicate. let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( @@ -870,6 +932,8 @@ impl BranchingNative { } } } + // Insert a new speculate branch into the given storage, + // MERGING it with an existing branch if their predicate keys clash. fn insert_branch_merging( branches: &mut HashMap, predicate: Predicate, @@ -896,7 +960,14 @@ impl BranchingNative { } } } - fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { + // Given the predicate for the round's solution, collapse this + // branching native to an ended branch whose predicate is consistent with it. + // return as `RoundEndedNative` the result of a native completing successful round + fn collapse_with( + self, + logger: &mut dyn Logger, + solution_predicate: &Predicate, + ) -> RoundEndedNative { log!( logger, "Collapsing native with {} branch preds {:?}", @@ -914,13 +985,22 @@ impl BranchingNative { if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); - return RoundOk { batch_index: index, gotten }; + return RoundEndedNative { batch_index: index, gotten }; } } panic!("Native had no branches matching pred {:?}", solution_predicate); } } impl BranchingProtoComponent { + // Create a singleton-branch branching protocol component as + // speculation begins, with the given protocol state. + fn initial(state: ComponentState) -> Self { + let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; + Self { branches: hashmap! { Predicate::default() => branch } } + } + // run all the given branches (cd.input) to their SyncBlocker, + // populating cd.output (by way of CyclicDrainer::cyclic_drain). + // This procedure might lose branches, and it might create new branches. fn drain_branches_to_blocked( cd: CyclicDrainer, cu: &mut impl CuUndecided, @@ -933,6 +1013,7 @@ impl BranchingProtoComponent { predicate: &predicate, branch_inner: &mut branch.inner, }; + // Run this component's state to the next syncblocker for handling let blocker = branch.state.sync_run(&mut ctx, cu.proto_description()); log!( cu.logger(), @@ -944,62 +1025,55 @@ impl BranchingProtoComponent { use SyncBlocker as B; match blocker { B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency - B::NondetChoice { n } => { - let var = rctx.spec_var_stream.next(); - for val in SpecVal::iter_domain().take(n as usize) { - let pred = predicate.clone().inserted(var, val); - let mut branch_n = branch.clone(); - branch_n.inner.untaken_choice = Some(val.0); - drainer.add_input(pred, branch_n); - } - } B::CouldntReadMsg(port) => { - // move to "blocked" - assert!(!branch.inner.inbox.contains_key(&port)); + // sanity check: `CouldntReadMsg` returned IFF the message is unavailable + assert!(!branch.inner.inbox.contains_key(&port)); + // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle. drainer.add_output(predicate, branch); } B::CouldntCheckFiring(port) => { - // sanity check + // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned let var = rctx.current_state.spec_var_for(port); assert!(predicate.query(var).is_none()); - // keep forks in "unblocked" + // speculate on the two possible values of `var`. Schedule both branches to be rerun. drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch); } B::PutMsg(putter, payload) => { - // sanity check + // sanity check: The given port indeed has `Putter` polarity assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity); - // overwrite assignment + // assign FIRING to this port's associated firing variable let var = rctx.current_state.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { + // Discard the branch, as it clearly has contradictory requirements for this value. log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); - // discard forever drop((predicate, branch)); } else { - // keep in "unblocked" - branch.inner.did_put_or_get.insert(putter); + // Note that this port has put this round, + // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error) + assert!(branch.inner.did_put_or_get.insert(putter)); log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &predicate, &payload, putter, var); + // Send the given payload (by buffering it). let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.putter_push(cu, putter, msg); + // Branch can still make progress. Schedule to be rerun drainer.add_input(predicate, branch); } } B::SyncBlockEnd => { - // make concrete all variables - for (port, port_info) in rctx.current_state.port_info.iter() { - if port_info.owner != proto_component_id { - continue; - } + // This branch reached the end of it's synchronous block + // assign all variables of owned ports that DIDN'T fire to SILENT + for port in rctx.current_state.ports_owned_by(proto_component_id) { let var = rctx.current_state.spec_var_for(*port); - let should_have_fired = branch.inner.did_put_or_get.contains(port); + let actually_exchanged = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); - let did_fire = val == SpecVal::FIRING; - if did_fire != should_have_fired { - log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", - port, var, val, did_fire, should_have_fired); + let speculated_to_fire = val == SpecVal::FIRING; + if actually_exchanged != speculated_to_fire { + log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}", + port, var, val, actually_exchanged, speculated_to_fire); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); @@ -1013,13 +1087,29 @@ impl BranchingProtoComponent { predicate.clone(), ); branch.ended = true; - // move to "blocked" + // This branch exits the cyclic drain drainer.add_output(predicate, branch); } + B::NondetChoice { n } => { + // This branch requested the creation of a new n-way nondeterministic + // fork of the branch with a fresh speculative variable. + // ... allocate a new speculative variable + let var = rctx.spec_var_stream.next(); + // ... and for n distinct values, create a new forked branch, + // and schedule them to be rerun through the cyclic drain. + for val in SpecVal::iter_domain().take(n as usize) { + let pred = predicate.clone().inserted(var, val); + let mut branch_n = branch.clone(); + branch_n.inner.untaken_choice = Some(val.0); + drainer.add_input(pred, branch_n); + } + } } Ok(()) }) } + // Feed this branching protocol component the given message, and + // then run all branches until they are once again blocked. fn feed_msg( &mut self, cu: &mut impl CuUndecided, @@ -1036,12 +1126,11 @@ impl BranchingProtoComponent { getter, &send_payload_msg ); - let BranchingProtoComponent { branches } = self; let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); - // partition drain from branches -> {unblocked, blocked} - log!(cu.logger(), "visiting {} blocked branches...", branches.len()); - for (predicate, mut branch) in branches.drain() { + // partition drain from self.branches -> {unblocked, blocked} (not cyclic) + log!(cu.logger(), "visiting {} blocked branches...", self.branches.len()); + for (predicate, mut branch) in self.branches.drain() { if branch.ended { log!(cu.logger(), "Skipping ended branch with {:?}", &predicate); Self::insert_branch_merging(&mut blocked, predicate, branch); @@ -1049,9 +1138,11 @@ impl BranchingProtoComponent { } use AssignmentUnionResult as Aur; log!(cu.logger(), "visiting branch with pred {:?}", &predicate); + // We give each branch a chance to receive this message, + // those that do are maybe UNBLOCKED, and all others remain BLOCKED. match predicate.assignment_union(&send_payload_msg.predicate) { Aur::Nonexistant => { - // this branch does not receive the message + // this branch does not receive the message. categorize into blocked. log!(cu.logger(), "skipping branch"); Self::insert_branch_merging(&mut blocked, predicate, branch); } @@ -1059,6 +1150,7 @@ impl BranchingProtoComponent { // retain the existing predicate, but add this payload log!(cu.logger(), "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); + // this branch does receive the message. categorize into unblocked. Self::insert_branch_merging(&mut unblocked, predicate, branch); } Aur::LatterNotFormer => { @@ -1067,6 +1159,7 @@ impl BranchingProtoComponent { let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); + // the branch that receives the message is unblocked, the original one is blocked Self::insert_branch_merging(&mut blocked, predicate, branch); Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } @@ -1075,6 +1168,7 @@ impl BranchingProtoComponent { log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); + // the branch that receives the message is unblocked, the original one is blocked Self::insert_branch_merging(&mut blocked, predicate, branch); Self::insert_branch_merging(&mut unblocked, predicate2, branch2); } @@ -1082,14 +1176,16 @@ impl BranchingProtoComponent { } log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked - let (swap, _pcb_temps) = pcb_temps.split_first_mut(); + let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back - std::mem::swap(blocked.0, branches); + std::mem::swap(blocked.0, &mut self.branches); log!(cu.logger(), "component settles down with branches: {:?}", branches.keys()); Ok(()) } + // Insert a new speculate branch into the given storage, + // MERGING it with an existing branch if their predicate keys clash. fn insert_branch_merging( branches: &mut HashMap, predicate: Predicate, @@ -1114,6 +1210,8 @@ impl BranchingProtoComponent { } } } + // Given the predicate for the round's solution, collapse this + // branching native to an ended branch whose predicate is consistent with it. fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState { let BranchingProtoComponent { branches } = self; for (branch_predicate, branch) in branches { @@ -1124,19 +1222,25 @@ impl BranchingProtoComponent { } panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } - fn initial(state: ComponentState) -> Self { - let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; - Self { branches: hashmap! { Predicate::default() => branch } } - } } impl SolutionStorage { + // Create a new solution storage, to manage the local solutions for + // this connector and all of it's children (subtrees) in the solution tree. fn new(subtree_ids: impl Iterator) -> Self { + // For easy iteration, we store this SubtreeId => {Predicate} + // structure instead as a pair of structures: a vector of predicate sets, + // and a subtree_id-to-index lookup map let mut subtree_id_to_index: HashMap = Default::default(); let mut subtree_solutions = vec![]; for id in subtree_ids { subtree_id_to_index.insert(id, subtree_solutions.len()); subtree_solutions.push(Default::default()) } + // new_local U old_local represents the solutions of this connector itself: + // namely, those that can be created from the union of one element from each child's solution set. + // The difference between new and old is that new stores those NOT YET sent over the network + // to this connector's parent in the solution tree. + // invariant: old_local and new_local have an empty intersection Self { subtree_solutions, subtree_id_to_index, @@ -1144,13 +1248,18 @@ impl SolutionStorage { new_local: Default::default(), } } + // drain old_local to new_local, visiting all new additions to old_local pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { - old_local.insert(local.clone()); + // rely on invariant: empty intersection between old and new local sets + assert!(old_local.insert(local.clone())); local }) } + // insert a solution for the given subtree ID, + // AND update new_local to include any solutions that become + // possible as a result of this new addition pub(crate) fn submit_and_digest_subtree_solution( &mut self, cu: &mut impl CuUndecided, @@ -1158,15 +1267,19 @@ impl SolutionStorage { predicate: Predicate, ) { log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate); - let index = self.subtree_id_to_index[&subtree_id]; - let left = 0..index; - let right = (index + 1)..self.subtree_solutions.len(); - - let Self { subtree_solutions, new_local, old_local, .. } = self; + let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self; + let index = subtree_id_to_index[&subtree_id]; let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { + // This is a newly-added solution! update new_local + // consider ALL consistent combinations of one element from each solution set + // to our right or left in the solution-set vector + // but with THIS PARTICULAR predicate from our own index. + let left = 0..index; + let right = (index + 1)..subtree_solutions.len(); // iterator over SETS of solutions, one for every component except `subtree_id` (me) let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); + // Recursively enumerate all solutions matching the description above, Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); } } @@ -1177,8 +1290,9 @@ impl SolutionStorage { old_local: &'b HashSet, new_local: &'a mut HashSet, ) { + // if let Some(set) = set_visitor.next() { - // incomplete solution. keep traversing + // incomplete solution. keep recursively creating combined solutions for pred in set.iter() { if let Some(elaborated) = pred.union_with(&partial) { Self::elaborate_into_new_local_rec( @@ -1191,7 +1305,7 @@ impl SolutionStorage { } } } else { - // recursive stop condition. `partial` is a local subtree solution + // recursive stop condition. This is a solution for this connector... if !old_local.contains(&partial) { // ... and it hasn't been found before log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial); @@ -1200,12 +1314,14 @@ impl SolutionStorage { } } } + impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { let var = self.rctx.current_state.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + // Note that this component has received this port's message 1+ times this round self.branch_inner.did_put_or_get.insert(port); self.branch_inner.inbox.get(&port) } @@ -1309,10 +1425,16 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { self, mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> { + // This hides the ugliness of facilitating a memory-safe cyclic drain. + // A "drain" would refer to a procedure that empties the input and populates the output. + // It's "cyclic" because the processing function can also populate the input. + // Making this memory safe requires an additional temporary storage, such that + // the input can safely be drained and populated concurrently. let Self { input, inner: CyclicDrainerInner { swap, output } } = self; - // assert!(swap.is_empty()); while !input.is_empty() { for (k, v) in input.drain() { + // func is the user-provided callback function, which consumes an element + // as its drained from the input func(k, v, CyclicDrainerInner { swap, output })? } std::mem::swap(input, swap); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 37b1dff24e324a9cd07455480885b1a47880bc06..34f8bcd79070695e834f42aa20bc2268c07a6b7f 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -93,7 +93,7 @@ struct SpecVal(u16); // native component can freely reflect on how it went, reading the messages received at their // inputs, and reflecting on which of their connector's synchronous batches succeeded. #[derive(Debug)] -struct RoundOk { +struct RoundEndedNative { batch_index: usize, gotten: HashMap, } @@ -265,7 +265,7 @@ struct ConnectorCommunication { endpoint_manager: EndpointManager, neighborhood: Neighborhood, native_batches: Vec, - round_result: Result, SyncError>, + round_result: Result, SyncError>, } #[derive(Debug)] struct ConnectorUnphased { @@ -317,8 +317,8 @@ struct SolutionStorage { // invariant: old_local U new_local solutions are those that can be created from // the UNION of one element from each set in `subtree_solution`. // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. - old_local: HashSet, - new_local: HashSet, + old_local: HashSet, // already sent to this connector's parent OR decided + new_local: HashSet, // not yet sent to this connector's parent OR decided // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, @@ -406,6 +406,12 @@ impl VecSet { } } impl CurrentState { + fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { + self.port_info + .iter() + .filter(move |(_, port_info)| port_info.owner == owner) + .map(|(port, _)| port) + } fn spec_var_for(&self, port: PortId) -> SpecVar { let info = self.port_info.get(&port).unwrap(); SpecVar(match info.polarity {