Changeset - 6d801fa106fc
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-06-22 18:28:26
christopher.esterhuyse@gmail.com
cleanup and comments
1 file changed with 22 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -146,130 +146,145 @@ impl Connector {
 
                            port,
 
                            SendPayloadMsg { payload_predicate: predicate.clone(), payload },
 
                        ));
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 

	
 
                // run all proto components to their sync blocker
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    // run this component to sync blocker in-place
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
                        &mut Default::default(),
 
                    ];
 
                    // DRAIN-AND-POPULATE PATTERN: DRAINING unblocked into blocked while POPULATING unblocked
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger: &mut *self.logger,
 
                                predicate: &predicate,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            use SyncBlocker as B;
 
                            match branch.state.sync_run(&mut ctx, &self.proto_description) {
 
                                B::Inconsistent => {
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate);
 
                                    // discard forever
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // todo falsify
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} reached SyncBlockEnd", proto_component_id, &predicate);
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *self.logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
                                        predicate.clone(),
 
                                    );
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        payload_outbox.push((
 
                                            port,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                todo!()
 
                // now all components are blocked!
 
                //
 

	
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    for _solution in solution_storage.iter_new_local_make_old() {
 
                        // todo check if parent, inform children etc. etc.
 
                        break 'undecided Ok(0);
 
                    }
 

	
 
                    // send / recv messages
 
                };
 
                decision
 
            }
 
        }
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
0 comments (0 inline, 0 general)