Changeset - c170b58f95b1
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-09-23 12:14:26
christopher.esterhuyse@gmail.com
more safety assertions, some minor cleanup, and major commenting
2 files changed with 221 insertions and 93 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -241,7 +241,7 @@ impl Connector {
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
    ) -> Result<Option<RoundEndedNative>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
@@ -378,11 +378,7 @@ impl Connector {
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for (port, port_info) in cu.current_state.port_info.iter() {
 
                    if port_info.owner != cu.native_component_id {
 
                        // not my port
 
                        continue;
 
                    }
 
                for port in cu.current_state.ports_owned_by(cu.native_component_id) {
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
@@ -433,7 +429,8 @@ impl Connector {
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        // Call to another big method; keep running this round
 
        // until a distributed decision is reached!
 
        log!(cu.logger(), "Searching for decision...");
 
        log!(@BENCH, cu.logger(), "");
 
        let decision = Self::sync_reach_decision(
 
@@ -467,18 +464,19 @@ impl Connector {
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                log!(cu.logger, "Failure with {:#?}", &rctx.solution_storage);
 
                // untouched port/component fields of `cu` are NOT overwritten.
 
                // the result is a rollback.
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    // "flatten" branching components, committing the speculation
 
                    // consistent with the predicate decided upon.
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.current_state = rctx.current_state;
 
@@ -488,7 +486,8 @@ impl Connector {
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger(), &predicate)))
 
                let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate);
 
                Ok(Some(round_ok))
 
            }
 
        };
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
@@ -503,6 +502,7 @@ impl Connector {
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        // The round is in progress, and now its just a matter of arriving at a decision.
 
        log!(@MARK, cu.logger(), "decide start");
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
@@ -529,7 +529,9 @@ impl Connector {
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // run all proto components to their sync blocker
 
        // first, we run every protocol component to their sync blocker.
 
        // Afterwards we establish a loop invariant: no new decision can be reached
 
        // without handling messages in the buffer or arriving from the network
 
        log!(
 
            cu.logger(),
 
            "Running all {} proto components to their sync blocker...",
 
@@ -540,7 +542,7 @@ impl Connector {
 
            // must reborrow to constrain the lifetime of pcb_temps to inside the loop
 
            let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut();
 
            let (blocked, _pcb_temps) = pcb_temps.split_first_mut();
 
            // initially, no components have .ended==true
 
            // initially, no protocol components have .ended==true
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, swap.0, blocked.0);
 
            BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
@@ -561,17 +563,18 @@ impl Connector {
 
            }
 
        }
 
        log!(cu.logger(), "All proto components are blocked");
 
        // ...invariant established!
 

	
 
        log!(cu.logger(), "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            // handle all buffered messages, sending them through endpoints / feeding them to components
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.current_state.port_info.get(&getter).unwrap();
 
                let cid = getter_info.owner;
 
                assert_eq!(Getter, getter_info.polarity);
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
                    cu.logger(),
 
                    "Routing msg {:?} to {:?} via {:?}",
 
@@ -581,18 +584,23 @@ impl Connector {
 
                );
 
                match getter_info.route {
 
                    Route::UdpEndpoint { index } => {
 
                        // this is a message sent over the network through a UDP endpoint
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.logger(), "Delivering to udp endpoint index={}", index);
 
                        // UDP mediator messages are buffered until the end of the round,
 
                        // because they are still speculative
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Route::NetEndpoint { index } => {
 
                        // this is a message sent over the network as a control message
 
                        log!(@MARK, cu.logger(), "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        // actually send the message now
 
                        comm.endpoint_manager.send_to_comms(index, &msg)?;
 
                    }
 
                    Route::LocalComponent if cid == cu.native_component_id() => branching_native
 
@@ -604,8 +612,11 @@ impl Connector {
 
                            MapTempGuard::new(&mut bn_temp_owner),
 
                        ),
 
                    Route::LocalComponent => {
 
                        // some other component_id routed locally. must be a protocol component!
 
                        if let Some(branching_component) = branching_proto_components.get_mut(&cid)
 
                        {
 
                            // The recipient component is still running!
 
                            // Feed it this message AND run it again until all branches are blocked
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
@@ -615,6 +626,8 @@ impl Connector {
 
                                pcb_temps.reborrow(),
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                // A solution is impossible! this component has zero branches
 
                                // Initiate a rollback
 
                                log!(cu.logger(), "{:?} has become inconsistent!", cid);
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
@@ -628,6 +641,10 @@ impl Connector {
 
                                }
 
                            }
 
                        } else {
 
                            // This case occurs when the component owning `getter` has exited,
 
                            // but the putter is still running (and sent this message).
 
                            // we drop the message on the floor, because it cannot be involved
 
                            // in a solution (requires sending a message over a dead channel!).
 
                            log!(
 
                                cu.logger(),
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
@@ -639,7 +656,7 @@ impl Connector {
 
                    }
 
                }
 
            }
 

	
 
            // payload buffer is empty.
 
            // check if we have a solution yet
 
            log!(cu.logger(), "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
@@ -647,6 +664,8 @@ impl Connector {
 
                log!(@MARK, cu.logger(), "local solution");
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        // Always forward connector-local solutions to my parent
 
                        // AS they are moved from new->old in solution storage.
 
                        log!(cu.logger(), "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
@@ -665,13 +684,20 @@ impl Connector {
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            // try recv ONE message arriving through an endpoint
 
            log!(cu.logger(), "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                // This is the first call that may block the thread!
 
                // Until a message arrives over the network, no new solutions are possible.
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) =
 
                    match comm.endpoint_manager.try_recv_any_comms(cu, rctx, comm.round_index)? {
 
                        CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg),
 
                        CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                        CommRecvOk::NewPayloadMsgs => {
 
                            // 1+ speculative payloads have been buffered
 
                            // but no other control messages that require further handling
 
                            // restart the loop to process the messages before blocking
 
                            continue 'undecided;
 
                        }
 
                        CommRecvOk::TimeoutWithoutNew => {
 
                            log!(cu.logger(), "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
@@ -684,10 +710,13 @@ impl Connector {
 
                                log!(cu.logger(), "As the leader, deciding on timeout");
 
                                return Ok(Decision::Failure);
 
                            }
 
                            // disable future timeout events! our request for failure has been sent
 
                            // all we can do at this point is wait.
 
                            rctx.deadline = None;
 
                            continue 'undecided;
 
                        }
 
                    };
 
                // We received a control message that requires further action
 
                log!(
 
                    cu.logger(),
 
                    "Received from endpoint {} ctrl msg {:?}",
 
@@ -696,7 +725,8 @@ impl Connector {
 
                );
 
                match comm_ctrl_msg {
 
                    CommCtrlMsg::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        // We receive the solution of another connector (part of the decision process)
 
                        // (only accept this through a child endpoint)
 
                        if comm.neighborhood.children.contains(&net_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
@@ -708,6 +738,7 @@ impl Connector {
 
                                    );
 
                                }
 
                                Decision::Failure => {
 
                                    // Someone timed out! propagate this to parent or decide
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(cu.logger(), "I decide on my child's failure");
 
@@ -725,6 +756,9 @@ impl Connector {
 
                                }
 
                            }
 
                        } else {
 
                            // Unreachable if all connectors are playing by the rules.
 
                            // Silently ignored instead of causing panic to make the
 
                            // runtime more robust against network fuzz
 
                            log!(
 
                                cu.logger(),
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
@@ -734,10 +768,13 @@ impl Connector {
 
                        }
 
                    }
 
                    CommCtrlMsg::Announce { decision } => {
 
                        // Apparently this round is over! A decision has been reached
 
                        if Some(net_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            // We accept the decision because it comes from our parent.
 
                            // end this loop, and and the synchronous round
 
                            return Ok(decision);
 
                        } else {
 
                            // Again, unreachable if all connectors are playing by the rules
 
                            log!(
 
                                cu.logger(),
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
@@ -751,6 +788,8 @@ impl Connector {
 
            log!(cu.logger(), "Endpoint msg recv done");
 
        }
 
    }
 

	
 
    // Send a failure request to my parent in the consensus tree
 
    fn request_failure(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
@@ -771,6 +810,10 @@ impl NativeBranch {
 
    }
 
}
 
impl BranchingNative {
 
    // Feed the given payload to the native component
 
    // May result in discovering new component solutions,
 
    // or fork speculative branches if the message's predicate
 
    // is MORE SPECIFIC than the branches of the native
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
@@ -784,15 +827,36 @@ impl BranchingNative {
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
        // Visit all native's branches, and feed those whose current predicates are
 
        // consistent with that of the received message.
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = rctx.current_state.spec_var_for(getter);
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch,
 
                // because the resulting branch would have an inconsistent predicate.
 
                // the existing branch asserts the getter port is SILENT
 
                log!(
 
                    cu.logger(),
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            // Define a little helper closure over `rctx`
 
            // for feeding the given branch this new payload,
 
            // and submitting any resulting solutions
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                // This branch notes the getter port as "gotten"
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if let Some(was) = branch.gotten.insert(getter, send_payload_msg.payload.clone()) {
 
                    // Sanity check. Payload mapping (Predicate,Port) should be unique each round
 
                    assert_eq!(&was, &send_payload_msg.payload);
 
                }
 
                if branch.is_ended() {
 
                    // That was the last message the branch was awaiting!
 
                    // Submitting new component solution.
 
                    log!(
 
                        cu.logger(),
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
@@ -806,6 +870,7 @@ impl BranchingNative {
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    // This branch still has ports awaiting their messages
 
                    log!(
 
                        cu.logger(),
 
                        "Fed native {:?} still has to_get {:?}",
 
@@ -814,20 +879,11 @@ impl BranchingNative {
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger(),
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    // The predicates of this branch and the payload are incompatible
 
                    // retain this branch as-is
 
                    log!(
 
                        cu.logger(),
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
@@ -836,13 +892,17 @@ impl BranchingNative {
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    // The branch's existing predicate "covers" (is at least as specific)
 
                    // as that of the payload. Can feed this branch the message without altering
 
                    // the branch predicate.
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger(), "branch pred covers it! Accept the msg");
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    // The predicates of branch and payload are compatible,
 
                    // but that of the payload is strictly more specific than that of the latter.
 
                    // FORK the branch, feed the fork the message, and give it the payload's predicate.
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
@@ -856,7 +916,9 @@ impl BranchingNative {
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    // The predicates of branch and payload are compatible,
 
                    // but their union is some new predicate (both preds assign something new).
 
                    // FORK the branch, feed the fork the message, and give it the new predicate.
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
@@ -870,6 +932,8 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
@@ -896,7 +960,14 @@ impl BranchingNative {
 
            }
 
        }
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    // return as `RoundEndedNative` the result of a native completing successful round
 
    fn collapse_with(
 
        self,
 
        logger: &mut dyn Logger,
 
        solution_predicate: &Predicate,
 
    ) -> RoundEndedNative {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
@@ -914,13 +985,22 @@ impl BranchingNative {
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
                return RoundEndedNative { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output (by way of CyclicDrainer::cyclic_drain).
 
    // This procedure might lose branches, and it might create new branches.
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut impl CuUndecided,
 
@@ -933,6 +1013,7 @@ impl BranchingProtoComponent {
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            // Run this component's state to the next syncblocker for handling
 
            let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
            log!(
 
                cu.logger(),
 
@@ -944,62 +1025,55 @@ impl BranchingProtoComponent {
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                    assert!(!branch.inner.inbox.contains_key(&port)); 
 
                    // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.current_state.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity);
 
                    // overwrite assignment
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.current_state.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
                        log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.inner.did_put_or_get.insert(putter);
 
                        // Note that this port has put this round,
 
                        // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error)
 
                        assert!(branch.inner.did_put_or_get.insert(putter));
 
                        log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &predicate, &payload, putter, var);
 
                        // Send the given payload (by buffering it).
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.putter_push(cu, putter, msg);
 
                        // Branch can still make progress. Schedule to be rerun
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for (port, port_info) in rctx.current_state.port_info.iter() {
 
                        if port_info.owner != proto_component_id {
 
                            continue;
 
                        }
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.current_state.ports_owned_by(proto_component_id) {
 
                        let var = rctx.current_state.spec_var_for(*port);
 
                        let should_have_fired = branch.inner.did_put_or_get.contains(port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}",
 
                                port, var, val, did_fire, should_have_fired);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
                        if actually_exchanged != speculated_to_fire {
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}",
 
                                port, var, val, actually_exchanged, speculated_to_fire);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
@@ -1013,13 +1087,29 @@ impl BranchingProtoComponent {
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    // This branch exits the cyclic drain
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::NondetChoice { n } => {
 
                    // This branch requested the creation of a new n-way nondeterministic
 
                    // fork of the branch with a fresh speculative variable.
 
                    // ... allocate a new speculative variable 
 
                    let var = rctx.spec_var_stream.next();
 
                    // ... and for n distinct values, create a new forked branch,
 
                    // and schedule them to be rerun through the cyclic drain.
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 
    // Feed this branching protocol component the given message, and
 
    // then run all branches until they are once again blocked. 
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
@@ -1036,12 +1126,11 @@ impl BranchingProtoComponent {
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches } = self;
 
        let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut();
 
        let (mut blocked, pcb_temps) = pcb_temps.split_first_mut();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(cu.logger(), "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
        // partition drain from self.branches -> {unblocked, blocked} (not cyclic)
 
        log!(cu.logger(), "visiting {} blocked branches...", self.branches.len());
 
        for (predicate, mut branch) in self.branches.drain() {
 
            if branch.ended {
 
                log!(cu.logger(), "Skipping ended branch with {:?}", &predicate);
 
                Self::insert_branch_merging(&mut blocked, predicate, branch);
 
@@ -1049,9 +1138,11 @@ impl BranchingProtoComponent {
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(cu.logger(), "visiting branch with pred {:?}", &predicate);
 
            // We give each branch a chance to receive this message,
 
            // those that do are maybe UNBLOCKED, and all others remain BLOCKED.
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    // this branch does not receive the message. categorize into blocked.
 
                    log!(cu.logger(), "skipping branch");
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                }
 
@@ -1059,6 +1150,7 @@ impl BranchingProtoComponent {
 
                    // retain the existing predicate, but add this payload
 
                    log!(cu.logger(), "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    // this branch does receive the message. categorize into unblocked.
 
                    Self::insert_branch_merging(&mut unblocked, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
@@ -1067,6 +1159,7 @@ impl BranchingProtoComponent {
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    // the branch that receives the message is unblocked, the original one is blocked
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
@@ -1075,6 +1168,7 @@ impl BranchingProtoComponent {
 
                    log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    // the branch that receives the message is unblocked, the original one is blocked
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
@@ -1082,14 +1176,16 @@ impl BranchingProtoComponent {
 
        }
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut();
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, branches);
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
@@ -1114,6 +1210,8 @@ impl BranchingProtoComponent {
 
            }
 
        }
 
    }
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
@@ -1124,19 +1222,25 @@ impl BranchingProtoComponent {
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 
}
 
impl SolutionStorage {
 
    // Create a new solution storage, to manage the local solutions for
 
    // this connector and all of it's children (subtrees) in the solution tree.
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        // For easy iteration, we store this SubtreeId => {Predicate}
 
        // structure instead as a pair of structures: a vector of predicate sets,
 
        // and a subtree_id-to-index lookup map
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        // new_local U old_local represents the solutions of this connector itself:
 
        // namely, those that can be created from the union of one element from each child's solution set.
 
        // The difference between new and old is that new stores those NOT YET sent over the network
 
        // to this connector's parent in the solution tree.
 
        // invariant: old_local and new_local have an empty intersection
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
@@ -1144,13 +1248,18 @@ impl SolutionStorage {
 
            new_local: Default::default(),
 
        }
 
    }
 
    // drain old_local to new_local, visiting all new additions to old_local
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            // rely on invariant: empty intersection between old and new local sets
 
            assert!(old_local.insert(local.clone()));
 
            local
 
        })
 
    }
 
    // insert a solution for the given subtree ID,
 
    // AND update new_local to include any solutions that become 
 
    // possible as a result of this new addition  
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
@@ -1158,15 +1267,19 @@ impl SolutionStorage {
 
        predicate: Predicate,
 
    ) {
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self;
 
        let index = subtree_id_to_index[&subtree_id];
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            // This is a newly-added solution! update new_local
 
            // consider ALL consistent combinations of one element from each solution set
 
            // to our right or left in the solution-set vector
 
            // but with THIS PARTICULAR predicate from our own index.
 
            let left = 0..index;
 
            let right = (index + 1)..subtree_solutions.len();
 
            // iterator over SETS of solutions, one for every component except `subtree_id` (me)
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            // Recursively enumerate all solutions matching the description above,
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 
@@ -1177,8 +1290,9 @@ impl SolutionStorage {
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        // 
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            // incomplete solution. keep recursively creating combined solutions
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
@@ -1191,7 +1305,7 @@ impl SolutionStorage {
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            // recursive stop condition. This is a solution for this connector...
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial);
 
@@ -1200,12 +1314,14 @@ impl SolutionStorage {
 
        }
 
    }
 
}
 

	
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        // Note that this component has received this port's message 1+ times this round
 
        self.branch_inner.did_put_or_get.insert(port);
 
        self.branch_inner.inbox.get(&port)
 
    }
 
@@ -1309,10 +1425,16 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
        // A "drain" would refer to a procedure that empties the input and populates the output.
 
        // It's "cyclic" because the processing function can also populate the input.
 
        // Making this memory safe requires an additional temporary storage, such that
 
        // the input can safely be drained and populated concurrently.
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                // func is the user-provided callback function, which consumes an element
 
                // as its drained from the input
 
                func(k, v, CyclicDrainerInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
src/runtime/mod.rs
Show inline comments
 
@@ -93,7 +93,7 @@ struct SpecVal(u16);
 
// native component can freely reflect on how it went, reading the messages received at their
 
// inputs, and reflecting on which of their connector's synchronous batches succeeded.
 
#[derive(Debug)]
 
struct RoundOk {
 
struct RoundEndedNative {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
@@ -265,7 +265,7 @@ struct ConnectorCommunication {
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
@@ -317,8 +317,8 @@ struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    old_local: HashSet<Predicate>, // already sent to this connector's parent OR decided
 
    new_local: HashSet<Predicate>, // not yet sent to this connector's parent OR decided
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
@@ -406,6 +406,12 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
    }
 
}
 
impl CurrentState {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.port_info
 
            .iter()
 
            .filter(move |(_, port_info)| port_info.owner == owner)
 
            .map(|(port, _)| port)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        let info = self.port_info.get(&port).unwrap();
 
        SpecVar(match info.polarity {
0 comments (0 inline, 0 general)