Changeset - ba68b3aa475d
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-02-13 16:38:56
christopheresterhuyse@gmail.com
fiddling
1 file changed with 33 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime/ecs.rs
Show inline comments
 
@@ -218,24 +218,25 @@ enum Entity {
 
/// Invariant: every component is either:
 
///        in to_run = (to_run_r U to_run_w)
 
///     or in ONE of the ekeys (which means it is blocked by a get on that ekey)
 
///     or in sync_ended (because they reached the end of their sync block)
 
///     or in inconsistent (because they are inconsistent)
 
#[derive(Default)]
 
struct Ecs {
 
    component_info: Vec<(Arc<Protocol>, HashSet<ChannelId>)>,
 
    entities: Vec<Entity>,
 
    round_solution: Vec<(ChannelId, bool)>, // encodes an ASSIGNMENT
 
    ekey_channel_ids: Vec<ChannelId>,       // all channel Ids for local keys
 
    flags: EntityFlags,
 
    ekey_to_channel_id: HashMap<Key, ChannelId>,
 
}
 
#[derive(Default)]
 
struct EntityFlags {
 
    assignments: HashMap<(ChannelId, bool), BitSet>,
 
    payloads: BitSet,
 
    ekeys: HashMap<Key, BitSet>,
 
    inconsistent: BitSet,
 
    sync_ended: BitSet,
 
    to_run_r: BitSet, // read from and drained while...
 
    to_run_w: BitSet, // .. written to and populated. }
 
}
 

	
 
@@ -253,25 +254,25 @@ impl Ecs {
 
        //    we are going to shift entities around, so all bitsets need to be cleared anyway.
 
        self.flags.assignments.clear();
 
        self.flags.payloads.clear();
 
        self.flags.ekeys.clear();
 
        self.flags.inconsistent.clear();
 
        self.flags.to_run_r.clear();
 
        self.flags.to_run_w.clear();
 
        self.flags.sync_ended.clear();
 

	
 
        // 2. We discard all payloads; they are all stale now.
 
        //    All machines are contiguous in the vector
 
        self.entities
 
            .retain(|entity| if let Entity::Machine { .. } = entity { true } else { false });
 
            .retain(move |entity| if let Entity::Machine { .. } = entity { true } else { false });
 

	
 
        // 3. initially, all the components need a chance to run in MONO mode
 
        self.flags.to_run_r.set_ones_until(self.entities.len());
 

	
 
        // 4. INVARIANT established:
 
        //    for all State variants in self.entities,
 
        //        exactly one bit throughout the fields of csb is set.
 

	
 
        // 5. Run all machines in (csb.to_run_r U csb.to_run_w).
 
        //    Single, logical set is broken into readable / writable parts to allow concurrent reads / writes safely.
 
        while !self.flags.to_run_r.is_empty() {
 
            for _eid in self.flags.to_run_r.iter() {
 
@@ -283,43 +284,43 @@ impl Ecs {
 
        assert!(self.flags.to_run_w.is_empty());
 

	
 
        #[allow(unreachable_code)] // DEBUG
 
        'recv_loop: loop {
 
            let ekey: Key = todo!();
 
            let msg: Msg = todo!();
 
            // 1. check if this message is redundant, i.e., there is already an equivalent payload with predicate >= this one.
 
            //    ie. starting from all payloads
 

	
 
            // 2. try and find a payload whose predicate is the same or more general than this one
 
            //    if it exists, drop the message; it is uninteresting.
 
            let ekey_bitset = self.flags.ekeys.get(&ekey);
 
            if let Some(_eid) = ekey_bitset.map(|ekey_bitset| {
 
            if let Some(_eid) = ekey_bitset.map(move |ekey_bitset| {
 
                let mut slice_builder = vec![];
 
                // collect CONFLICTING assignments into slice_builder
 
                for &(channel_id, boolean) in msg.assignments.iter() {
 
                    if let Some(bitset) = self.flags.assignments.get(&(channel_id, !boolean)) {
 
                        slice_builder.push(bitset.as_slice());
 
                    }
 
                }
 
                let chunk_iter =
 
                    InNoneExceptIter::new(slice_builder.as_slice(), ekey_bitset.as_slice());
 
                BitChunkIter::new(chunk_iter).next()
 
            }) {
 
                // _eid is a payload whose predicate is at least as general
 
                // drop this message!
 
                continue 'recv_loop;
 
            }
 

	
 
            // 3. insert this payload as an entity, overwriting an existing LESS GENERAL payload if it exists.
 
            let payload_eid: usize = if let Some(eid) = ekey_bitset.and_then(|ekey_bitset| {
 
            let payload_eid: usize = if let Some(eid) = ekey_bitset.and_then(move |ekey_bitset| {
 
                let mut slice_builder = vec![];
 
                slice_builder.push(ekey_bitset.as_slice());
 
                for assignment in msg.assignments.iter() {
 
                    if let Some(bitset) = self.flags.assignments.get(assignment) {
 
                        slice_builder.push(bitset.as_slice());
 
                    }
 
                }
 
                let chunk_iter = AndChunkIter::new(slice_builder.as_slice());
 
                BitChunkIter::new(chunk_iter).next()
 
            }) {
 
                // overwrite this entity index.
 
                eid
 
@@ -332,36 +333,58 @@ impl Ecs {
 
                    bitset.set(eid);
 
                }
 
                self.flags.payloads.set(eid);
 
                eid
 
            };
 

	
 
            self.feed_msg(payload_eid, ekey);
 
        }
 
    }
 

	
 
    fn run_poly_p(&mut self, machine_eid: usize) {
 
        match self.entities.get_mut(machine_eid) {
 
            Some(Entity::Machine { component_index, .. }) => {
 
            Some(Entity::Machine { component_index, state }) => {
 
                // TODO run the machine
 
                // DEBUG: testing the closing of all silent ports
 
                use PolyBlocker as Pb;
 
                let blocker: Pb = todo!();
 
                match blocker {
 
                    Pb::Inconsistent => self.flags.inconsistent.set(machine_eid),
 
                    Pb::CouldntCheckFiring(key) => {
 
                        let &channel_id = self.ekey_to_channel_id.get(&key).unwrap();
 
                        let state_true = state.clone();
 
                        let assignments: Vec<(ChannelId, bool)> = self
 
                            .flags
 
                            .assignments
 
                            .iter()
 
                            .filter_map(move |(&assignment, bitset)| {
 
                                match bitset.test(machine_eid) {
 
                                    true => Some(assignment),
 
                                    false => None,
 
                                }
 
                            })
 
                            .collect();
 

	
 
                        // FORK! this machine becomes FALSE
 
                    }
 
                    _ => todo!(),
 
                }
 

	
 
                // 1. make the assignment of this machine concrete WRT its ports
 
                let component_info = self.component_info.get(*component_index).unwrap();
 
                for &channel_id in component_info.1.iter() {
 
                    let test = self
 
                        .flags
 
                        .assignments
 
                        .get(&(channel_id, true))
 
                        .map(|bitset| bitset.test(machine_eid))
 
                        .map(move |bitset| bitset.test(machine_eid))
 
                        .unwrap_or(false);
 
                    if !test {
 
                        // TRUE assignment wasn't set
 
                        // so set FALSE assignment (no effect if already set)
 
                        self.flags
 
                            .assignments
 
                            .entry((channel_id, false))
 
                            .or_default()
 
                            .set(machine_eid);
 
                    }
 
                }
 
                // 2. this machine becomes solved
 
@@ -411,25 +434,25 @@ impl Ecs {
 
            }
 
            // SOLUTION COMPLETE!
 
            return;
 
        }
 
    }
 

	
 
    fn machine_assignment_for(&self, machine_eid: usize, channel_id: ChannelId) -> Option<bool> {
 
        let test = move |bitset: &BitSet| bitset.test(machine_eid);
 
        self.flags
 
            .assignments
 
            .get(&(channel_id, true))
 
            .map(test)
 
            .or_else(|| self.flags.assignments.get(&(channel_id, false)).map(test))
 
            .or_else(move || self.flags.assignments.get(&(channel_id, false)).map(test))
 
    }
 

	
 
    fn feed_msg(&mut self, payload_eid: usize, ekey: Key) {
 
        // 1. identify the component who:
 
        //    * is blocked on this ekey,
 
        //    * and has a predicate at least as strict as that of this payload
 
        let mut slice_builder = vec![];
 
        let ekey_bitset =
 
            self.flags.ekeys.get_mut(&ekey).expect("Payload sets this => cannot be empty");
 
        slice_builder.push(ekey_bitset.as_slice());
 
        for bitset in self.flags.assignments.values() {
 
            // it doesn't matter which assignment! just that this payload sets it too
 
@@ -454,50 +477,50 @@ struct InAllExceptIter<'a> {
 
    except: &'a [u32],
 
}
 
impl<'a> InAllExceptIter<'a> {
 
    fn new(in_all: &'a [&'a [u32]], except: &'a [u32]) -> Self {
 
        Self { in_all, except, next_chunk_index: 0 }
 
    }
 
}
 
impl<'a> Iterator for InAllExceptIter<'a> {
 
    type Item = u32;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let i = self.next_chunk_index;
 
        self.next_chunk_index += 1;
 
        let init = self.except.get(i).map(|&x| !x).or(Some(1));
 
        self.in_all.iter().fold(init, |folding, slice| {
 
        let init = self.except.get(i).map(move |&x| !x).or(Some(1));
 
        self.in_all.iter().fold(init, move |folding, slice| {
 
            let a = folding?;
 
            let b = slice.get(i).copied().unwrap_or(0);
 
            Some(a & !b)
 
        })
 
    }
 
}
 

	
 
struct InNoneExceptIter<'a> {
 
    next_chunk_index: usize,
 
    in_none: &'a [&'a [u32]],
 
    except: &'a [u32],
 
}
 
impl<'a> InNoneExceptIter<'a> {
 
    fn new(in_none: &'a [&'a [u32]], except: &'a [u32]) -> Self {
 
        Self { in_none, except, next_chunk_index: 0 }
 
    }
 
}
 
impl<'a> Iterator for InNoneExceptIter<'a> {
 
    type Item = u32;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let i = self.next_chunk_index;
 
        self.next_chunk_index += 1;
 
        let init = self.except.get(i).copied()?;
 
        Some(self.in_none.iter().fold(init, |folding, slice| {
 
        Some(self.in_none.iter().fold(init, move |folding, slice| {
 
            let a = folding;
 
            let b = slice.get(i).copied().unwrap_or(0);
 
            a & !b
 
        }))
 
    }
 
}
 

	
 
/*
 
The idea is we have a set of component machines that fork whenever they reflect on the oracle to make concrete their predicates.
 
their speculative execution procedure BLOCKS whenever they reflect on the contents of a message that has not yet arrived.
 
the promise is, therefore, never to forget about these blocked machines.
 
the only event that unblocks a machine
0 comments (0 inline, 0 general)