Changeset - 0021301bc812
[Not reviewed]
v0.2.0
0 1 0
Christopher Esterhuyse - 5 years ago 2020-03-06 11:08:07
christopher.esterhuyse@gmail.com
tearing down correctly now
1 file changed with 21 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -107,62 +107,82 @@ impl Controller {
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        if let Some(e) = self.unrecoverable_error {
 
            return Err(e.clone());
 
        }
 
        self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e {
 
            SyncErr::Timeout => e, // this isn't unrecoverable
 
            _ => {
 
                // Must set unrecoverable error! and tear down our net channels
 
                self.unrecoverable_error = Some(e);
 
                self.ephemeral.clear();
 
                self.inner.endpoint_exts = Default::default();
 
                e
 
            }
 
        })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
    fn sync_round_inner(
 
        &mut self,
 
        mut deadline: Option<Instant>,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 
        assert!(self.ephemeral.is_clear());
 
        assert!(self.unrecoverable_error.is_none());
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len());
 
        while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                mono_ps: &mut self.ephemeral.mono_ps,
 
                inner: &mut self.inner,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
0 comments (0 inline, 0 general)