Changeset - bcfb0eaf7288
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-03-03 17:11:05
christopheresterhuyse@gmail.com
ye
2 files changed with 44 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    //usable BETWEEN rounds
 
    fn consistent(&self) -> bool {
 
        self.inner.mono_n.is_some()
 
    }
 

	
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        // let mut table_row = HashMap::<Key, _>::default();
 

	
 
        let n_pair = {
 
            let poly_n = self.ephemeral.poly_n.take().unwrap();
 
            let mono_n = poly_n.choose_mono(&decision).unwrap();
 
            (mono_n, poly_n)
 
        };
 
        self.inner.mono_n = Some(n_pair.0.clone());
 

	
 
        let p_pairs = self
 
            .ephemeral
 
            .poly_ps
 
            .drain(..)
 
            .map(|poly_p| {
 
                let mono_p = poly_p.choose_mono(&decision).unwrap();
 
                (mono_p, poly_p)
 
            })
 
            .collect::<Vec<_>>();
 
            .collect::<Vec<_>>()
 
            .into_boxed_slice();
 
        self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone()));
 
        self.round_histories.push(RoundHistory::Consistent(decision.clone(), n_pair, p_pairs));
 
        self.round_histories.push(RoundHistory::Consistent {
 
            decision: decision.clone(),
 
            native_component: n_pair,
 
            protocol_components: p_pairs,
 
        });
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(decision) = maybe_decision {
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
    fn kick_off_native(
 
        &mut self,
 
        sync_batches: impl Iterator<Item = SyncBatch>,
 
    ) -> Result<PolyN, EndpointErr> {
 
        let MonoN { ekeys, .. } = self.inner.mono_n.take().unwrap();
 
        let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self;
 
        let mut branches = HashMap::<_, _>::default();
 
        for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() {
 
            let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            let all_ekeys = ekeys.iter().copied();
 
            let all_channel_ids = all_ekeys.map(ekey_to_channel_id);
 

	
 
            let mut predicate = Predicate::new_trivial();
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        if !self.consistent() {
 
            // was previously inconsistent
 
            return Err(SyncErr::Inconsistent);
 
        }
 
        match self.sync_round_inner(deadline, sync_batches) {
 
            Ok(()) => Ok(()),
 
            Err(e) => {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...",
 
                );
 
                let h = RoundHistory::Inconsistent(
 
                    std::mem::take(&mut self.ephemeral.solution_storage),
 
                    self.ephemeral.poly_n.take().unwrap(),
 
                    std::mem::take(&mut self.ephemeral.poly_ps),
 
                );
 
                self.round_histories.push(h);
 
                for (round_index, round) in self.round_histories.iter().enumerate() {
 
                    log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round);
 
        if self.consistent() {
 
            match self.sync_round_inner(deadline, sync_batches) {
 
                Ok(()) => return Ok(()),
 
                Err(error) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...",
 
                    );
 
                    let h = RoundHistory::Inconsistent {
 
                        error,
 
                        subtree_solutions: std::mem::take(&mut self.ephemeral.solution_storage),
 
                        native_component: self.ephemeral.poly_n.take().unwrap(),
 
                        protocol_components: std::mem::take(&mut self.ephemeral.poly_ps)
 
                            .into_boxed_slice(),
 
                    };
 
                    self.round_histories.push(h);
 
                    for (round_index, round) in self.round_histories.iter().enumerate() {
 
                        log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round);
 
                    }
 
                }
 

	
 
                Err(e)
 
            }
 
        }
 
        if let Some(RoundHistory::Inconsistent { error, .. }) =
 
            self.round_histories.iter().rev().next()
 
        {
 
            Err(error.clone())
 
        } else {
 
            unreachable!()
 
        }
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    fn sync_round_inner(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        assert!(self.ephemeral.is_clear());
 

	
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len());
 
        self.ephemeral.poly_ps.clear();
 
        // let mut poly_ps: Vec<PolyP> = vec![];
 
        while let Some(mut mono_p) = self.inner.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                inner: &mut self.inner,
 
                // endpoint_exts: &mut self.endpoint_exts,
 
                // mono_ps: &mut self.mono_ps,
 
                // channel_id_stream: &mut self.channel_id_stream,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N)));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = self.inner.mono_n.iter().map(|_| SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ekeys
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
src/runtime/mod.rs
Show inline comments
 
@@ -4,194 +4,203 @@ pub mod ffi;
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
// pub mod experimental;
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
enum RoundHistory {
 
    Consistent(Predicate, (MonoN, PolyN), Vec<(MonoP, PolyP)>),
 
    Inconsistent(SolutionStorage, PolyN, Vec<PolyP>),
 
    Consistent {
 
        decision: Predicate,
 
        native_component: (MonoN, PolyN),
 
        protocol_components: Box<[(MonoP, PolyP)]>,
 
    },
 
    Inconsistent {
 
        error: SyncErr,
 
        subtree_solutions: SolutionStorage,
 
        native_component: PolyN,
 
        protocol_components: Box<[PolyP]>,
 
    },
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    round_histories: Vec<RoundHistory>,
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: Option<MonoN>,
 
    mono_ps: Vec<MonoP>,
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ekeys: &'a mut HashSet<Key>,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
    my_subtree_id: SubtreeId,
 
    inner: &'a mut ControllerInner,
 
    solution_storage: &'a mut SolutionStorage,
 
}
 
impl PolyPContext<'_> {
 
    #[inline(always)]
 
    fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
        let Self { solution_storage, my_subtree_id, inner } = self;
 
        PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
    }
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ekeys: &'r HashSet<Key>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Key, Payload>,
 
}
 

	
 
#[derive(Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
    }
0 comments (0 inline, 0 general)