Changeset - 1b64d493b3d2
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-02-13 16:24:23
christopheresterhuyse@gmail.com
fiddling
1 file changed with 224 insertions and 44 deletions:
0 comments (0 inline, 0 general)
src/runtime/ecs.rs
Show inline comments
 
@@ -40,12 +40,19 @@ impl BitSet {
 
    }
 
    #[inline(always)]
 
    fn index_decomposed(index: usize) -> [usize; 2] {
 
        // [chunk_index, chunk_bit]
 
        [index / 32, index % 32]
 
    }
 
    fn test(&self, at: usize) -> bool {
 
        let [chunk_index, chunk_bit] = Self::index_decomposed(at);
 
        match self.0.get(chunk_index) {
 
            None => false,
 
            Some(&chunk) => (chunk & (1 << chunk_bit)) != 0,
 
        }
 
    }
 
    fn set(&mut self, at: usize) {
 
        let [chunk_index, chunk_bit] = Self::index_decomposed(at);
 
        if chunk_index >= self.0.len() {
 
            self.0.resize(chunk_index + 1, 0u32);
 
        }
 
        let chunk = unsafe {
 
@@ -202,123 +209,296 @@ fn test_bit_iter() {
 
    let indices = iter.collect::<Vec<_>>();
 
    println!("indices {:?}", indices);
 
}
 

	
 
enum Entity {
 
    Payload(Payload),
 
    State(ProtocolS),
 
    Machine { state: ProtocolS, component_index: usize },
 
}
 

	
 
/// Invariant: every component is either:
 
///        in to_run = (to_run_r U to_run_w)
 
///     or in ONE of the ekeys (which means it is blocked by a get on that ekey)
 
///     or in sync_ended (because they reached the end of their sync block)
 
///     or in inconsistent (because they are inconsistent)
 
#[derive(Default)]
 
struct Ecs {
 
    component_info: Vec<(Arc<Protocol>, HashSet<ChannelId>)>,
 
    entities: Vec<Entity>,
 
    assignments: HashMap<(ChannelId, bool), BitSet>,
 
    ekeys: HashMap<Key, BitSet>,
 
    csb: ComponentStatusBits,
 
    round_solution: Vec<(ChannelId, bool)>, // encodes an ASSIGNMENT
 
    ekey_channel_ids: Vec<ChannelId>,       // all channel Ids for local keys
 
    flags: EntityFlags,
 
}
 

	
 
#[derive(Default)]
 
struct ComponentStatusBits {
 
struct EntityFlags {
 
    assignments: HashMap<(ChannelId, bool), BitSet>,
 
    payloads: BitSet,
 
    ekeys: HashMap<Key, BitSet>,
 
    inconsistent: BitSet,
 
    blocked: BitSet,
 
    sync_ended: BitSet,
 
    to_run_r: BitSet, // read from and drained while...
 
    to_run_w: BitSet, // .. written to and populated.
 
    to_run_w: BitSet, // .. written to and populated. }
 
}
 
impl ComponentStatusBits {
 
    fn clear_all(&mut self) {
 
        self.blocked.clear();
 
        self.inconsistent.clear();
 
        self.to_run_r.clear();
 
        self.to_run_w.clear();
 
        self.sync_ended.clear();
 
    }
 

	
 
struct Protocol {
 
    // TODO
 
}
 

	
 
struct Msg {
 
    assignments: Vec<(ChannelId, bool)>, // invariant: no two elements have same ChannelId value
 
    payload: Payload,
 
}
 
impl Ecs {
 
    fn round(&mut self) {
 
        // 1. at the start of the round we throw away all assignments.
 
        //    we are going to shift entities around, so all bitsets need to be cleared anyway.
 
        self.assignments.clear();
 
        self.csb.clear_all();
 
        self.ekeys.clear();
 
        self.flags.assignments.clear();
 
        self.flags.payloads.clear();
 
        self.flags.ekeys.clear();
 
        self.flags.inconsistent.clear();
 
        self.flags.to_run_r.clear();
 
        self.flags.to_run_w.clear();
 
        self.flags.sync_ended.clear();
 

	
 
        // 2. We discard all payloads; they are all stale now.
 
        //    All components are now contiguous in the vector.
 
        self.entities.retain(|entity| if let Entity::State(_) = entity { true } else { false });
 
        //    All machines are contiguous in the vector
 
        self.entities
 
            .retain(|entity| if let Entity::Machine { .. } = entity { true } else { false });
 

	
 
        // 3. initially, all the components need a chance to run in MONO mode
 
        self.csb.to_run_r.set_ones_until(self.entities.len());
 
        self.flags.to_run_r.set_ones_until(self.entities.len());
 

	
 
        // 4. INVARIANT established:
 
        //    for all State variants in self.entities,
 
        //        exactly one bit throughout the fields of csb is set.
 

	
 
        // 5. Run all machines in (csb.to_run_r U csb.to_run_w).
 
        //    Single, logical set is broken into readable / writable parts to allow concurrent reads / writes safely.
 
        while !self.csb.to_run_r.is_empty() {
 
            for _entity_index in self.csb.to_run_r.iter() {
 
        while !self.flags.to_run_r.is_empty() {
 
            for _eid in self.flags.to_run_r.iter() {
 
                // TODO run and possbibly manipulate self.to_run_w
 
            }
 
            self.csb.to_run_r.clear();
 
            std::mem::swap(&mut self.csb.to_run_r, &mut self.csb.to_run_w);
 
            self.flags.to_run_r.clear();
 
            std::mem::swap(&mut self.flags.to_run_r, &mut self.flags.to_run_w);
 
        }
 
        assert!(self.csb.to_run_w.is_empty());
 
        assert!(self.flags.to_run_w.is_empty());
 

	
 
        #[allow(unreachable_code)] // DEBUG
 
        'recv_loop: loop {
 
            let ekey: Key = todo!();
 
            let msg: Msg = todo!();
 
            // 1. check if this message is redundant, i.e., there is already an equivalent payload with predicate >= this one.
 
            //    ie. starting from all payloads
 

	
 
            // 2. try and find a payload whose predicate subsumes this one
 
            if let Some(_entity_index) = self.ekeys.get(&ekey).map(|ekey_bitset| {
 
            // 2. try and find a payload whose predicate is the same or more general than this one
 
            //    if it exists, drop the message; it is uninteresting.
 
            let ekey_bitset = self.flags.ekeys.get(&ekey);
 
            if let Some(_eid) = ekey_bitset.map(|ekey_bitset| {
 
                let mut slice_builder = vec![];
 
                // collect CONFLICTING assignments into slice_builder
 
                for &(channel_id, boolean) in msg.assignments.iter() {
 
                    if let Some(bitset) = self.assignments.get(&(channel_id, !boolean)) {
 
                    if let Some(bitset) = self.flags.assignments.get(&(channel_id, !boolean)) {
 
                        slice_builder.push(bitset.as_slice());
 
                    }
 
                }
 
                NoStricterPayloadIter {
 
                    next_chunk_index: 0,
 
                    in_here: ekey_bitset.as_slice(),
 
                    but_in_none_of: slice_builder.as_slice(),
 
                }
 
                .next()
 
                let chunk_iter =
 
                    InNoneExceptIter::new(slice_builder.as_slice(), ekey_bitset.as_slice());
 
                BitChunkIter::new(chunk_iter).next()
 
            }) {
 
                // _eid is a payload whose predicate is at least as general
 
                // drop this message!
 
                continue 'recv_loop;
 
            }
 

	
 
            // receive incoming messages
 
            // 3. insert this payload as an entity, overwriting an existing LESS GENERAL payload if it exists.
 
            let payload_eid: usize = if let Some(eid) = ekey_bitset.and_then(|ekey_bitset| {
 
                let mut slice_builder = vec![];
 
                slice_builder.push(ekey_bitset.as_slice());
 
                for assignment in msg.assignments.iter() {
 
                    if let Some(bitset) = self.flags.assignments.get(assignment) {
 
                        slice_builder.push(bitset.as_slice());
 
                    }
 
                }
 
                let chunk_iter = AndChunkIter::new(slice_builder.as_slice());
 
                BitChunkIter::new(chunk_iter).next()
 
            }) {
 
                // overwrite this entity index.
 
                eid
 
            } else {
 
                // nothing to overwrite. add a new payload entity.
 
                let eid = self.entities.len();
 
                self.entities.push(Entity::Payload(msg.payload));
 
                for &assignment in msg.assignments.iter() {
 
                    let mut bitset = self.flags.assignments.entry(assignment).or_default();
 
                    bitset.set(eid);
 
                }
 
                self.flags.payloads.set(eid);
 
                eid
 
            };
 

	
 
            self.feed_msg(payload_eid, ekey);
 
        }
 
    }
 

	
 
    fn run_poly_p(&mut self, machine_eid: usize) {
 
        match self.entities.get_mut(machine_eid) {
 
            Some(Entity::Machine { component_index, .. }) => {
 
                // TODO run the machine
 
                // DEBUG: testing the closing of all silent ports
 

	
 
                // 1. make the assignment of this machine concrete WRT its ports
 
                let component_info = self.component_info.get(*component_index).unwrap();
 
                for &channel_id in component_info.1.iter() {
 
                    let test = self
 
                        .flags
 
                        .assignments
 
                        .get(&(channel_id, true))
 
                        .map(|bitset| bitset.test(machine_eid))
 
                        .unwrap_or(false);
 
                    if !test {
 
                        // TRUE assignment wasn't set
 
                        // so set FALSE assignment (no effect if already set)
 
                        self.flags
 
                            .assignments
 
                            .entry((channel_id, false))
 
                            .or_default()
 
                            .set(machine_eid);
 
                    }
 
                }
 
                // 2. this machine becomes solved
 
                self.flags.sync_ended.set(machine_eid);
 
                self.generate_new_solutions(machine_eid);
 
                // TODO run this machine to a poly blocker
 
                // potentially mark as inconsistent, blocked on some key, or solved
 
                // if solved
 
            }
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn generate_new_solutions(&mut self, newly_solved_machine_eid: usize) {
 
        // this vector will be used to store assignments from self.ekey_channel_ids to elements in {true, false}
 
        let mut solution_prefix = vec![];
 
        self.generate_new_solutions_rec(newly_solved_machine_eid, &mut solution_prefix);
 
        // let all_channel_ids =
 
        // let mut slice_builder = vec![];
 
    }
 
    fn generate_new_solutions_rec(
 
        &mut self,
 
        newly_solved_machine_eid: usize,
 
        solution_prefix: &mut Vec<bool>,
 
    ) {
 
        let eid = newly_solved_machine_eid;
 
        let n = solution_prefix.len();
 
        if let Some(&channel_id) = self.ekey_channel_ids.get(n) {
 
            if let Some(assignment) = self.machine_assignment_for(eid, channel_id) {
 
                // this machine already gives an assignment
 
                solution_prefix.push(assignment);
 
                self.generate_new_solutions_rec(eid, solution_prefix);
 
            } else {
 
                // this machine does not give an assignment. try both branches!
 
                solution_prefix.push(false);
 
                self.generate_new_solutions_rec(eid, solution_prefix);
 
                solution_prefix.pop();
 
                solution_prefix.push(true);
 
                self.generate_new_solutions_rec(eid, solution_prefix);
 
            }
 
            solution_prefix.pop();
 
        } else {
 
            println!("SOLUTION:");
 
            for (channel_id, assignment) in self.ekey_channel_ids.iter().zip(solution_prefix.iter())
 
            {
 
                println!("{:?} => {:?}", channel_id, assignment);
 
            }
 
            // SOLUTION COMPLETE!
 
            return;
 
        }
 
    }
 

	
 
    fn machine_assignment_for(&self, machine_eid: usize, channel_id: ChannelId) -> Option<bool> {
 
        let test = move |bitset: &BitSet| bitset.test(machine_eid);
 
        self.flags
 
            .assignments
 
            .get(&(channel_id, true))
 
            .map(test)
 
            .or_else(|| self.flags.assignments.get(&(channel_id, false)).map(test))
 
    }
 

	
 
    fn feed_msg(&mut self, payload_eid: usize, ekey: Key) {
 
        // 1. identify the component who:
 
        //    * is blocked on this ekey,
 
        //    * and has a predicate at least as strict as that of this payload
 
        let mut slice_builder = vec![];
 
        let ekey_bitset =
 
            self.flags.ekeys.get_mut(&ekey).expect("Payload sets this => cannot be empty");
 
        slice_builder.push(ekey_bitset.as_slice());
 
        for bitset in self.flags.assignments.values() {
 
            // it doesn't matter which assignment! just that this payload sets it too
 
            if bitset.test(payload_eid) {
 
                slice_builder.push(bitset.as_slice());
 
            }
 
        }
 
        let chunk_iter =
 
            InAllExceptIter::new(slice_builder.as_slice(), self.flags.payloads.as_slice());
 
        let mut iter = BitChunkIter::new(chunk_iter);
 
        if let Some(component_key) = iter.next() {
 
            // TODO is it possible for there to be 2+ iterations? I'm thinking No
 
            // RUN THIS MACHINE
 
            ekey_bitset.unset(component_key); // no longer blocked!
 
        }
 
    }
 
}
 

	
 
struct NoStricterPayloadIter<'a> {
 
struct InAllExceptIter<'a> {
 
    next_chunk_index: usize,
 
    in_here: &'a [u32],
 
    but_in_none_of: &'a [&'a [u32]],
 
    in_all: &'a [&'a [u32]],
 
    except: &'a [u32],
 
}
 
impl<'a> Iterator for NoStricterPayloadIter<'a> {
 
impl<'a> InAllExceptIter<'a> {
 
    fn new(in_all: &'a [&'a [u32]], except: &'a [u32]) -> Self {
 
        Self { in_all, except, next_chunk_index: 0 }
 
    }
 
}
 
impl<'a> Iterator for InAllExceptIter<'a> {
 
    type Item = u32;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let i = self.next_chunk_index;
 
        self.next_chunk_index += 1;
 
        let init = self.in_here.get(i).copied();
 
        self.but_in_none_of.iter().fold(init, |folding, slice| {
 
        let init = self.except.get(i).map(|&x| !x).or(Some(1));
 
        self.in_all.iter().fold(init, |folding, slice| {
 
            let a = folding?;
 
            let b = slice.get(i).copied()?;
 
            let b = slice.get(i).copied().unwrap_or(0);
 
            Some(a & !b)
 
        })
 
    }
 
}
 

	
 
struct InNoneExceptIter<'a> {
 
    next_chunk_index: usize,
 
    in_none: &'a [&'a [u32]],
 
    except: &'a [u32],
 
}
 
impl<'a> InNoneExceptIter<'a> {
 
    fn new(in_none: &'a [&'a [u32]], except: &'a [u32]) -> Self {
 
        Self { in_none, except, next_chunk_index: 0 }
 
    }
 
}
 
impl<'a> Iterator for InNoneExceptIter<'a> {
 
    type Item = u32;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let i = self.next_chunk_index;
 
        self.next_chunk_index += 1;
 
        let init = self.except.get(i).copied()?;
 
        Some(self.in_none.iter().fold(init, |folding, slice| {
 
            let a = folding;
 
            let b = slice.get(i).copied().unwrap_or(0);
 
            a & !b
 
        }))
 
    }
 
}
 

	
 
/*
 
The idea is we have a set of component machines that fork whenever they reflect on the oracle to make concrete their predicates.
 
their speculative execution procedure BLOCKS whenever they reflect on the contents of a message that has not yet arrived.
 
the promise is, therefore, never to forget about these blocked machines.
 
the only event that unblocks a machine
 

	
0 comments (0 inline, 0 general)