Changeset - 7e100501dbde
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-09-18 17:24:22
christopher.esterhuyse@gmail.com
continued refactor (safe state) connector internals are (mostly) read-only until the moment the round succeed, whereafter the changes to ports/connectors are committed by being folded in
3 files changed with 215 insertions and 198 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -8,16 +8,6 @@ struct MapTempsGuard<'a, K, V>(&'a mut [HashMap<K, V>]);
 
// Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true
 
struct MapTempGuard<'a, K, V>(&'a mut HashMap<K, V>);
 

	
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
@@ -28,14 +18,6 @@ struct NativeBranch {
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
@@ -63,6 +45,17 @@ impl ReplaceBoolTrue for bool {
 
        !was
 
    }
 
}
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.inner.logger
 
    }
 
    fn proto_description(&self) -> &ProtocolDescription {
 
        &self.proto_description
 
    }
 
    fn native_component_id(&self) -> ComponentId {
 
        self.inner.native_component_id
 
    }
 
}
 

	
 
////////////////
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
@@ -96,14 +89,6 @@ impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
        self.0
 
    }
 
}
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
@@ -213,6 +198,7 @@ impl Connector {
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // iterate
 
        let current_state = cu.inner.current_state.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
@@ -260,6 +246,7 @@ impl Connector {
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            current_state,
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id));
 
                let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid));
 
@@ -277,7 +264,7 @@ impl Connector {
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            payload_inbox: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
@@ -348,7 +335,7 @@ impl Connector {
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
@@ -359,7 +346,7 @@ impl Connector {
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    cu,
 
                    SubtreeId::LocalComponent(cu.inner.native_component_id),
 
                    predicate.clone(),
 
                );
 
@@ -417,6 +404,8 @@ impl Connector {
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.inner.current_state = rctx.current_state;
 
                log!(
 
                    cu.inner.logger,
 
                    "End round with (updated) component states {:?}",
 
@@ -432,26 +421,26 @@ impl Connector {
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        log!(@MARK, cu.inner.logger, "decide start");
 
        log!(@MARK, cu.logger(), "decide start");
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.inner.logger, "Native starts with no branches! Failure!");
 
            log!(cu.logger(), "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.inner.logger, "No parent. Deciding on failure");
 
                    log!(cu.logger(), "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
@@ -462,7 +451,7 @@ impl Connector {
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.inner.logger,
 
            cu.logger(),
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
@@ -478,37 +467,33 @@ impl Connector {
 
            // swap the blocked branches back
 
            std::mem::swap(blocked.0, branches);
 
            if branches.is_empty() {
 
                log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id);
 
                log!(cu.logger(), "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.inner.logger, "As the leader, deciding on timeout");
 
                    log!(cu.logger(), "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.inner.logger, "All proto components are blocked");
 
        log!(cu.logger(), "All proto components are blocked");
 

	
 
        log!(cu.inner.logger, "Entering decision loop...");
 
        log!(cu.logger(), "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(
 
                cu.inner.logger,
 
                "Decision loop! have {} messages to recv",
 
                rctx.getter_buffer.len()
 
            );
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                log!(@MARK, cu.inner.logger, "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = cu.inner.current_state.port_info.get(&getter).unwrap();
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.current_state.port_info.get(&getter).unwrap();
 
                let cid = getter_info.owner;
 
                assert_eq!(Getter, getter_info.polarity);
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
@@ -519,26 +504,25 @@ impl Connector {
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.inner.logger, "Delivering to udp endpoint index={}", index);
 
                        log!(cu.logger(), "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Route::NetEndpoint { index } => {
 
                        log!(@MARK, cu.inner.logger, "sending payload");
 
                        log!(@MARK, cu.logger(), "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(index, &msg)?;
 
                    }
 
                    Route::LocalComponent if cid == cu.inner.native_component_id => {
 
                        branching_native.feed_msg(
 
                    Route::LocalComponent if cid == cu.native_component_id() => branching_native
 
                        .feed_msg(
 
                            cu,
 
                            rctx,
 
                            getter,
 
                            &send_payload_msg,
 
                            MapTempGuard::new(&mut bn_temp_owner),
 
                        )
 
                    }
 
                        ),
 
                    Route::LocalComponent => {
 
                        if let Some(branching_component) = branching_proto_components.get_mut(&cid)
 
                        {
 
@@ -551,21 +535,21 @@ impl Connector {
 
                                pcb_temps.reborrow(),
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(cu.inner.logger, "{:?} has become inconsistent!", cid);
 
                                log!(cu.logger(), "{:?} has become inconsistent!", cid);
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.inner.logger, "Already requested failure");
 
                                        log!(cu.logger(), "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.inner.logger, "As the leader, deciding on timeout");
 
                                    log!(cu.logger(), "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.inner.logger,
 
                                cu.logger(),
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
@@ -577,13 +561,13 @@ impl Connector {
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.inner.logger, "Check if we have any local decisions...");
 
            log!(cu.logger(), "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.inner.logger, "New local decision with solution {:?}...", &solution);
 
                log!(@MARK, cu.inner.logger, "local solution");
 
                log!(cu.logger(), "New local decision with solution {:?}...", &solution);
 
                log!(@MARK, cu.logger(), "local solution");
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.inner.logger, "Forwarding to my parent {:?}", parent);
 
                        log!(cu.logger(), "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
@@ -594,7 +578,7 @@ impl Connector {
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.inner.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        log!(cu.logger(), "No parent. Deciding on solution {:?}", &solution);
 
                        return Ok(Decision::Success(solution));
 
                    }
 
                }
 
@@ -602,28 +586,22 @@ impl Connector {
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg...");
 
            log!(cu.logger(), "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm
 
                    .endpoint_manager
 
                    .try_recv_any_comms(
 
                    &mut *cu.inner.logger,
 
                    &cu.inner.current_state,
 
                    rctx,
 
                    comm.round_index,
 
                )? {
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) =
 
                    match comm.endpoint_manager.try_recv_any_comms(cu, rctx, comm.round_index)? {
 
                        CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg),
 
                        CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                        CommRecvOk::TimeoutWithoutNew => {
 
                        log!(cu.inner.logger, "Reached user-defined deadling without decision...");
 
                            log!(cu.logger(), "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
                                if already_requested_failure.replace_with_true() {
 
                                    Self::request_failure(cu, comm, parent)?
 
                                } else {
 
                                log!(cu.inner.logger, "Already requested failure");
 
                                    log!(cu.logger(), "Already requested failure");
 
                                }
 
                            } else {
 
                            log!(cu.inner.logger, "As the leader, deciding on timeout");
 
                                log!(cu.logger(), "As the leader, deciding on timeout");
 
                                return Ok(Decision::Failure);
 
                            }
 
                            rctx.deadline = None;
 
@@ -631,7 +609,7 @@ impl Connector {
 
                        }
 
                    };
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "Received from endpoint {} ctrl msg  {:?}",
 
                    net_index,
 
                    &comm_ctrl_msg
 
@@ -643,30 +621,24 @@ impl Connector {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(
 
                                        cu.inner.logger,
 
                                        "Child provided solution {:?}",
 
                                        &predicate
 
                                    );
 
                                    log!(cu.logger(), "Child provided solution {:?}", &predicate);
 
                                    let subtree_id = SubtreeId::NetEndpoint { index: net_index };
 
                                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.inner.logger,
 
                                        subtree_id,
 
                                        predicate,
 
                                        cu, subtree_id, predicate,
 
                                    );
 
                                }
 
                                Decision::Failure => {
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(cu.inner.logger, "I decide on my child's failure");
 
                                            log!(cu.logger(), "I decide on my child's failure");
 
                                            break 'undecided Ok(Decision::Failure);
 
                                        }
 
                                        Some(parent) => {
 
                                            log!(cu.inner.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            log!(cu.logger(), "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            if already_requested_failure.replace_with_true() {
 
                                                Self::request_failure(cu, comm, parent)?
 
                                            } else {
 
                                                log!(cu.inner.logger, "Already requested failure");
 
                                                log!(cu.logger(), "Already requested failure");
 
                                            }
 
                                        }
 
                                    }
 
@@ -674,7 +646,7 @@ impl Connector {
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.inner.logger,
 
                                cu.logger(),
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                net_index
 
@@ -687,7 +659,7 @@ impl Connector {
 
                            return Ok(decision);
 
                        } else {
 
                            log!(
 
                                cu.inner.logger,
 
                                cu.logger(),
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                net_index
 
@@ -696,15 +668,15 @@ impl Connector {
 
                    }
 
                }
 
            }
 
            log!(cu.inner.logger, "Endpoint msg recv done");
 
            log!(cu.logger(), "Endpoint msg recv done");
 
        }
 
    }
 
    fn request_failure(
 
        cu: &mut ConnectorUnphased,
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.inner.logger, "Forwarding to my parent {:?}", parent);
 
        log!(cu.logger(), "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
@@ -721,41 +693,41 @@ impl NativeBranch {
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        round_ctx: &mut RoundCtx,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, cu.inner.current_state.port_info.get(&getter).unwrap().polarity);
 
        log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, rctx.current_state.port_info.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.inner.current_state.spec_var_for(getter);
 
            let var = rctx.current_state.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger(),
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id);
 
                    round_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                    let subtree_id = SubtreeId::LocalComponent(cu.native_component_id());
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        cu,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger(),
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
@@ -765,7 +737,7 @@ impl BranchingNative {
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.inner.logger,
 
                    cu.logger(),
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
@@ -777,7 +749,7 @@ impl BranchingNative {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger(),
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
@@ -786,7 +758,7 @@ impl BranchingNative {
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.inner.logger, "branch pred covers it! Accept the msg");
 
                    log!(cu.logger(), "branch pred covers it! Accept the msg");
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
@@ -795,7 +767,7 @@ impl BranchingNative {
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger(),
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
@@ -808,7 +780,7 @@ impl BranchingNative {
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.inner.logger,
 
                        cu.logger(),
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
@@ -871,19 +843,19 @@ impl BranchingNative {
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                rctx,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
            log!(
 
                cu.inner.logger,
 
                cu.logger(),
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
@@ -908,7 +880,7 @@ impl BranchingProtoComponent {
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.inner.current_state.spec_var_for(port);
 
                    let var = rctx.current_state.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
@@ -916,37 +888,37 @@ impl BranchingProtoComponent {
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity);
 
                    assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity);
 
                    // overwrite assignment
 
                    let var = cu.inner.current_state.spec_var_for(putter);
 
                    let var = rctx.current_state.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                        log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.inner.did_put_or_get.insert(putter);
 
                        log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                        log!(cu.logger(), "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        rctx.putter_push(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for (port, port_info) in cu.inner.current_state.port_info.iter() {
 
                    for (port, port_info) in rctx.current_state.port_info.iter() {
 
                        if port_info.owner != proto_component_id {
 
                            continue;
 
                        }
 
                        let var = cu.inner.current_state.spec_var_for(*port);
 
                        let var = rctx.current_state.spec_var_for(*port);
 
                        let should_have_fired = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}",
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}",
 
                                port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
@@ -956,7 +928,7 @@ impl BranchingProtoComponent {
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                        cu,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
@@ -970,16 +942,15 @@ impl BranchingProtoComponent {
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.inner.logger;
 
        log!(
 
            logger,
 
            cu.logger(),
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
@@ -989,30 +960,30 @@ impl BranchingProtoComponent {
 
        let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut();
 
        let (mut blocked, pcb_temps) = pcb_temps.split_first_mut();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        log!(cu.logger(), "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                log!(cu.logger(), "Skipping ended branch with {:?}", &predicate);
 
                Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            log!(cu.logger(), "visiting branch with pred {:?}", &predicate);
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    log!(cu.logger(), "skipping branch");
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    log!(cu.logger(), "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    Self::insert_branch_merging(&mut unblocked, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    log!(cu.logger(), "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
@@ -1021,7 +992,7 @@ impl BranchingProtoComponent {
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
@@ -1029,14 +1000,14 @@ impl BranchingProtoComponent {
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut();
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, branches);
 
        log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys());
 
        log!(cu.logger(), "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn insert_branch_merging(
 
@@ -1123,11 +1094,11 @@ impl SolutionStorage {
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        cu: &mut impl CuUndecided,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 
@@ -1136,17 +1107,11 @@ impl SolutionStorage {
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        cu: &mut impl CuUndecided,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
@@ -1157,7 +1122,7 @@ impl SolutionStorage {
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        cu,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
@@ -1169,35 +1134,15 @@ impl SolutionStorage {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl GetterBuffer {
 
    fn len(&self) -> usize {
 
        self.getters_and_sends.len()
 
    }
 
    fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.getters_and_sends.pop()
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getters_and_sends.push((getter, msg));
 
    }
 
    fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = cu.inner.current_state.port_info.get(&putter).unwrap().peer {
 
            log!(cu.inner.logger, "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_add(getter, msg);
 
        } else {
 
            log!(cu.inner.logger, "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.cu_inner.current_state.spec_var_for(port);
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
@@ -1305,3 +1250,46 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
        Ok(())
 
    }
 
}
 

	
 
// struct ConnectorComm {
 
//     logger: Box<dyn Logger>,
 
//     pd: Arc<ProtocolDescription>,
 
//     current_state: CurrentState,
 
//     components: HashMap<ComponentId, ComponentState>,
 
//     ports: HashMap<PortId, PortInfo>,
 
//     endpoint_manager: EndpointManager,
 
//     neighborhood: Neighborhood,
 
//     native_batches: Vec<NativeBatch>,
 
//     round_result: Result<Option<RoundOk>, SyncError>,
 
// }
 

	
 
// struct RoundTemp<'a> {
 
//     deadline: Option<Instant>,
 
//     msg_buf: Vec<(PortId, SendPayloadMsg)>,
 
//     solution_storage: SolutionStorage,
 
//     override_ports: HashMap<PortId, PortInfo>,
 
//     spec_var_stream: SpecVarStream,
 
//     branching_proto: HashMap<ComponentId, BranchingProtoComponent>,
 
//     branching_native: BranchingNative,
 
//     comm: &'a mut ConnectorComm,
 
// }
 
// impl ConnectorComm {
 
//     fn sync(&mut self, deadline: Option<Duration>) -> Result<usize, ()> {
 
//         RoundTemp {
 
//             msg_buf: Default::default(),
 
//             deadline: todo!(),
 
//             solution_storage: todo!(),
 
//             override_ports: Default::default(),
 
//             spec_var_stream: todo!(),
 
//             branching_proto: Default::default(),
 
//             branching_native: todo!(),
 
//             comm: self,
 
//         }
 
//         .sync()
 
//     }
 
// }
 
// impl RoundTemp<'_> {
 
//     fn sync(&mut self) -> Result<usize, ()> {
 
//         todo!()
 
//     }
 
// }
src/runtime/endpoints.rs
Show inline comments
 
@@ -146,22 +146,21 @@ impl EndpointManager {
 
    // drops all Setup messages,
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using round_ctx.getter_add
 
    // enqueues all current round SendPayload messages using rctx.getter_push
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        current_state: &CurrentState,
 
        round_ctx: &mut impl RoundCtxTrait,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        impl EndpointManager {
 
            fn handle_msg(
 
                &mut self,
 
                logger: &mut dyn Logger,
 
                round_ctx: &mut impl RoundCtxTrait,
 
                cu: &mut impl CuUndecided,
 
                rctx: &mut RoundCtx,
 
                net_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
@@ -173,7 +172,7 @@ impl EndpointManager {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                logger,
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                round_index,
 
@@ -182,7 +181,7 @@ impl EndpointManager {
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                logger,
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                round_index,
 
@@ -197,7 +196,7 @@ impl EndpointManager {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        round_ctx.getter_add(getter, send_payload_msg);
 
                        rctx.getter_push(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
@@ -209,23 +208,18 @@ impl EndpointManager {
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) = self.handle_msg(
 
                logger,
 
                round_ctx,
 
                net_index,
 
                msg,
 
                round_index,
 
                &mut some_message_enqueued,
 
            ) {
 
            if let Some((net_index, msg)) =
 
                self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued)
 
            {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? {
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    logger,
 
                    round_ctx,
 
                    cu,
 
                    rctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
@@ -243,9 +237,9 @@ impl EndpointManager {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = current_state.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = rctx.current_state.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        round_ctx.getter_add(
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
@@ -260,7 +254,7 @@ impl EndpointManager {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(logger, round_ctx.get_deadline()) {
 
            match self.poll_and_populate(cu.logger(), &rctx.deadline) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
src/runtime/mod.rs
Show inline comments
 
@@ -29,7 +29,7 @@ pub struct VecLogger(ConnectorId, Vec<u8>);
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
@@ -40,7 +40,8 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
    proto_component_id: ComponentId,          // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    rctx: &'a RoundCtx,
 
    // cu: &'a mut dyn CuUndecided,
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                        // KEY in pred->branch map
 
}
 
@@ -175,7 +176,7 @@ struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
@@ -249,6 +250,26 @@ enum ConnectorPhased {
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    current_state: CurrentState,
 
}
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
@@ -261,10 +282,6 @@ enum TokenTarget {
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
trait RoundCtxTrait {
 
    fn get_deadline(&self) -> &Option<Instant>;
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg);
 
}
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
@@ -653,3 +670,21 @@ impl Debug for UdpInBuffer {
 
        write!(f, "UdpInBuffer")
 
    }
 
}
 

	
 
impl RoundCtx {
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.current_state.port_info.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)