Changeset - 6d801fa106fc
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-06-22 18:28:26
christopher.esterhuyse@gmail.com
cleanup and comments
1 file changed with 22 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -74,274 +74,289 @@ impl NonsyncProtoContext<'_> {
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        self.predicate.query(port)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
}
 

	
 
impl Connector {
 
    pub fn sync(&mut self) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => {
 
                // 1. run all proto components to Nonsync blockers
 
                let mut branching_proto_components =
 
                    HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
                    // TODO coalesce fields
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *self.logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
                        unrun_components: &mut unrun_components,
 
                        proto_component_ports: &mut self
 
                            .proto_components
 
                            .get_mut(&proto_component_id)
 
                            .unwrap()
 
                            .ports,
 
                    };
 
                    use NonsyncBlocker as B;
 
                    match component.state.nonsync_run(&mut ctx, &self.proto_description) {
 
                        B::ComponentExit => drop(component),
 
                        B::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
                        }
 
                        B::SyncBlockStart => {
 
                            branching_proto_components.insert(
 
                                proto_component_id,
 
                                BranchingProtoComponent::initial(component),
 
                            );
 
                        }
 
                    }
 
                }
 

	
 
                // (Putter, )
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // 2. kick off the native
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let mut predicate = Predicate::default();
 
                    // assign trues
 
                    for &port in to_get.iter().chain(to_put.keys()) {
 
                        predicate.assigned.insert(port, true);
 
                    }
 
                    // assign falses
 
                    for &port in self.native_ports.iter() {
 
                        predicate.assigned.entry(port).or_insert(false);
 
                    }
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        payload_outbox.push((
 
                            port,
 
                            SendPayloadMsg { payload_predicate: predicate.clone(), payload },
 
                        ));
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 

	
 
                // run all proto components to their sync blocker
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    // run this component to sync blocker in-place
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
                        &mut Default::default(),
 
                    ];
 
                    // DRAIN-AND-POPULATE PATTERN: DRAINING unblocked into blocked while POPULATING unblocked
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger: &mut *self.logger,
 
                                predicate: &predicate,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            use SyncBlocker as B;
 
                            match branch.state.sync_run(&mut ctx, &self.proto_description) {
 
                                B::Inconsistent => {
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate);
 
                                    // discard forever
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // todo falsify
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    log!(self.logger, "Proto component {:?} branch with pred {:?} reached SyncBlockEnd", proto_component_id, &predicate);
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *self.logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
                                        predicate.clone(),
 
                                    );
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                    }
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        payload_outbox.push((
 
                                            port,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                todo!()
 
                // now all components are blocked!
 
                //
 

	
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    for _solution in solution_storage.iter_new_local_make_old() {
 
                        // todo check if parent, inform children etc. etc.
 
                        break 'undecided Ok(0);
 
                    }
 

	
 
                    // send / recv messages
 
                };
 
                decision
 
            }
 
        }
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: Route,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
0 comments (0 inline, 0 general)