From 1b64d493b3d2e63be38c15b11b62c2363d485bd7 2020-02-13 16:24:23 From: Christopher Esterhuyse Date: 2020-02-13 16:24:23 Subject: [PATCH] fiddling --- diff --git a/src/runtime/ecs.rs b/src/runtime/ecs.rs index efd53523ac9b50086e5b63d5c9c7d671452f0be4..afccbb8781588ae447451a33aa8413a097ee294b 100644 --- a/src/runtime/ecs.rs +++ b/src/runtime/ecs.rs @@ -43,6 +43,13 @@ impl BitSet { // [chunk_index, chunk_bit] [index / 32, index % 32] } + fn test(&self, at: usize) -> bool { + let [chunk_index, chunk_bit] = Self::index_decomposed(at); + match self.0.get(chunk_index) { + None => false, + Some(&chunk) => (chunk & (1 << chunk_bit)) != 0, + } + } fn set(&mut self, at: usize) { let [chunk_index, chunk_bit] = Self::index_decomposed(at); if chunk_index >= self.0.len() { @@ -205,34 +212,37 @@ fn test_bit_iter() { enum Entity { Payload(Payload), - State(ProtocolS), + Machine { state: ProtocolS, component_index: usize }, } +/// Invariant: every component is either: +/// in to_run = (to_run_r U to_run_w) +/// or in ONE of the ekeys (which means it is blocked by a get on that ekey) +/// or in sync_ended (because they reached the end of their sync block) +/// or in inconsistent (because they are inconsistent) #[derive(Default)] struct Ecs { + component_info: Vec<(Arc, HashSet)>, entities: Vec, - assignments: HashMap<(ChannelId, bool), BitSet>, - ekeys: HashMap, - csb: ComponentStatusBits, + round_solution: Vec<(ChannelId, bool)>, // encodes an ASSIGNMENT + ekey_channel_ids: Vec, // all channel Ids for local keys + flags: EntityFlags, } - #[derive(Default)] -struct ComponentStatusBits { +struct EntityFlags { + assignments: HashMap<(ChannelId, bool), BitSet>, + payloads: BitSet, + ekeys: HashMap, inconsistent: BitSet, - blocked: BitSet, sync_ended: BitSet, to_run_r: BitSet, // read from and drained while... - to_run_w: BitSet, // .. written to and populated. + to_run_w: BitSet, // .. written to and populated. } } -impl ComponentStatusBits { - fn clear_all(&mut self) { - self.blocked.clear(); - self.inconsistent.clear(); - self.to_run_r.clear(); - self.to_run_w.clear(); - self.sync_ended.clear(); - } + +struct Protocol { + // TODO } + struct Msg { assignments: Vec<(ChannelId, bool)>, // invariant: no two elements have same ChannelId value payload: Payload, @@ -241,16 +251,21 @@ impl Ecs { fn round(&mut self) { // 1. at the start of the round we throw away all assignments. // we are going to shift entities around, so all bitsets need to be cleared anyway. - self.assignments.clear(); - self.csb.clear_all(); - self.ekeys.clear(); + self.flags.assignments.clear(); + self.flags.payloads.clear(); + self.flags.ekeys.clear(); + self.flags.inconsistent.clear(); + self.flags.to_run_r.clear(); + self.flags.to_run_w.clear(); + self.flags.sync_ended.clear(); // 2. We discard all payloads; they are all stale now. - // All components are now contiguous in the vector. - self.entities.retain(|entity| if let Entity::State(_) = entity { true } else { false }); + // All machines are contiguous in the vector + self.entities + .retain(|entity| if let Entity::Machine { .. } = entity { true } else { false }); // 3. initially, all the components need a chance to run in MONO mode - self.csb.to_run_r.set_ones_until(self.entities.len()); + self.flags.to_run_r.set_ones_until(self.entities.len()); // 4. INVARIANT established: // for all State variants in self.entities, @@ -258,14 +273,14 @@ impl Ecs { // 5. Run all machines in (csb.to_run_r U csb.to_run_w). // Single, logical set is broken into readable / writable parts to allow concurrent reads / writes safely. - while !self.csb.to_run_r.is_empty() { - for _entity_index in self.csb.to_run_r.iter() { + while !self.flags.to_run_r.is_empty() { + for _eid in self.flags.to_run_r.iter() { // TODO run and possbibly manipulate self.to_run_w } - self.csb.to_run_r.clear(); - std::mem::swap(&mut self.csb.to_run_r, &mut self.csb.to_run_w); + self.flags.to_run_r.clear(); + std::mem::swap(&mut self.flags.to_run_r, &mut self.flags.to_run_w); } - assert!(self.csb.to_run_w.is_empty()); + assert!(self.flags.to_run_w.is_empty()); #[allow(unreachable_code)] // DEBUG 'recv_loop: loop { @@ -274,48 +289,213 @@ impl Ecs { // 1. check if this message is redundant, i.e., there is already an equivalent payload with predicate >= this one. // ie. starting from all payloads - // 2. try and find a payload whose predicate subsumes this one - if let Some(_entity_index) = self.ekeys.get(&ekey).map(|ekey_bitset| { + // 2. try and find a payload whose predicate is the same or more general than this one + // if it exists, drop the message; it is uninteresting. + let ekey_bitset = self.flags.ekeys.get(&ekey); + if let Some(_eid) = ekey_bitset.map(|ekey_bitset| { let mut slice_builder = vec![]; + // collect CONFLICTING assignments into slice_builder for &(channel_id, boolean) in msg.assignments.iter() { - if let Some(bitset) = self.assignments.get(&(channel_id, !boolean)) { + if let Some(bitset) = self.flags.assignments.get(&(channel_id, !boolean)) { slice_builder.push(bitset.as_slice()); } } - NoStricterPayloadIter { - next_chunk_index: 0, - in_here: ekey_bitset.as_slice(), - but_in_none_of: slice_builder.as_slice(), - } - .next() + let chunk_iter = + InNoneExceptIter::new(slice_builder.as_slice(), ekey_bitset.as_slice()); + BitChunkIter::new(chunk_iter).next() }) { + // _eid is a payload whose predicate is at least as general + // drop this message! continue 'recv_loop; } - // receive incoming messages + // 3. insert this payload as an entity, overwriting an existing LESS GENERAL payload if it exists. + let payload_eid: usize = if let Some(eid) = ekey_bitset.and_then(|ekey_bitset| { + let mut slice_builder = vec![]; + slice_builder.push(ekey_bitset.as_slice()); + for assignment in msg.assignments.iter() { + if let Some(bitset) = self.flags.assignments.get(assignment) { + slice_builder.push(bitset.as_slice()); + } + } + let chunk_iter = AndChunkIter::new(slice_builder.as_slice()); + BitChunkIter::new(chunk_iter).next() + }) { + // overwrite this entity index. + eid + } else { + // nothing to overwrite. add a new payload entity. + let eid = self.entities.len(); + self.entities.push(Entity::Payload(msg.payload)); + for &assignment in msg.assignments.iter() { + let mut bitset = self.flags.assignments.entry(assignment).or_default(); + bitset.set(eid); + } + self.flags.payloads.set(eid); + eid + }; + + self.feed_msg(payload_eid, ekey); + } + } + + fn run_poly_p(&mut self, machine_eid: usize) { + match self.entities.get_mut(machine_eid) { + Some(Entity::Machine { component_index, .. }) => { + // TODO run the machine + // DEBUG: testing the closing of all silent ports + + // 1. make the assignment of this machine concrete WRT its ports + let component_info = self.component_info.get(*component_index).unwrap(); + for &channel_id in component_info.1.iter() { + let test = self + .flags + .assignments + .get(&(channel_id, true)) + .map(|bitset| bitset.test(machine_eid)) + .unwrap_or(false); + if !test { + // TRUE assignment wasn't set + // so set FALSE assignment (no effect if already set) + self.flags + .assignments + .entry((channel_id, false)) + .or_default() + .set(machine_eid); + } + } + // 2. this machine becomes solved + self.flags.sync_ended.set(machine_eid); + self.generate_new_solutions(machine_eid); + // TODO run this machine to a poly blocker + // potentially mark as inconsistent, blocked on some key, or solved + // if solved + } + _ => unreachable!(), + } + } + + fn generate_new_solutions(&mut self, newly_solved_machine_eid: usize) { + // this vector will be used to store assignments from self.ekey_channel_ids to elements in {true, false} + let mut solution_prefix = vec![]; + self.generate_new_solutions_rec(newly_solved_machine_eid, &mut solution_prefix); + // let all_channel_ids = + // let mut slice_builder = vec![]; + } + fn generate_new_solutions_rec( + &mut self, + newly_solved_machine_eid: usize, + solution_prefix: &mut Vec, + ) { + let eid = newly_solved_machine_eid; + let n = solution_prefix.len(); + if let Some(&channel_id) = self.ekey_channel_ids.get(n) { + if let Some(assignment) = self.machine_assignment_for(eid, channel_id) { + // this machine already gives an assignment + solution_prefix.push(assignment); + self.generate_new_solutions_rec(eid, solution_prefix); + } else { + // this machine does not give an assignment. try both branches! + solution_prefix.push(false); + self.generate_new_solutions_rec(eid, solution_prefix); + solution_prefix.pop(); + solution_prefix.push(true); + self.generate_new_solutions_rec(eid, solution_prefix); + } + solution_prefix.pop(); + } else { + println!("SOLUTION:"); + for (channel_id, assignment) in self.ekey_channel_ids.iter().zip(solution_prefix.iter()) + { + println!("{:?} => {:?}", channel_id, assignment); + } + // SOLUTION COMPLETE! + return; + } + } + + fn machine_assignment_for(&self, machine_eid: usize, channel_id: ChannelId) -> Option { + let test = move |bitset: &BitSet| bitset.test(machine_eid); + self.flags + .assignments + .get(&(channel_id, true)) + .map(test) + .or_else(|| self.flags.assignments.get(&(channel_id, false)).map(test)) + } + + fn feed_msg(&mut self, payload_eid: usize, ekey: Key) { + // 1. identify the component who: + // * is blocked on this ekey, + // * and has a predicate at least as strict as that of this payload + let mut slice_builder = vec![]; + let ekey_bitset = + self.flags.ekeys.get_mut(&ekey).expect("Payload sets this => cannot be empty"); + slice_builder.push(ekey_bitset.as_slice()); + for bitset in self.flags.assignments.values() { + // it doesn't matter which assignment! just that this payload sets it too + if bitset.test(payload_eid) { + slice_builder.push(bitset.as_slice()); + } + } + let chunk_iter = + InAllExceptIter::new(slice_builder.as_slice(), self.flags.payloads.as_slice()); + let mut iter = BitChunkIter::new(chunk_iter); + if let Some(component_key) = iter.next() { + // TODO is it possible for there to be 2+ iterations? I'm thinking No + // RUN THIS MACHINE + ekey_bitset.unset(component_key); // no longer blocked! } } } -struct NoStricterPayloadIter<'a> { +struct InAllExceptIter<'a> { next_chunk_index: usize, - in_here: &'a [u32], - but_in_none_of: &'a [&'a [u32]], + in_all: &'a [&'a [u32]], + except: &'a [u32], } -impl<'a> Iterator for NoStricterPayloadIter<'a> { +impl<'a> InAllExceptIter<'a> { + fn new(in_all: &'a [&'a [u32]], except: &'a [u32]) -> Self { + Self { in_all, except, next_chunk_index: 0 } + } +} +impl<'a> Iterator for InAllExceptIter<'a> { type Item = u32; fn next(&mut self) -> Option { let i = self.next_chunk_index; self.next_chunk_index += 1; - let init = self.in_here.get(i).copied(); - self.but_in_none_of.iter().fold(init, |folding, slice| { + let init = self.except.get(i).map(|&x| !x).or(Some(1)); + self.in_all.iter().fold(init, |folding, slice| { let a = folding?; - let b = slice.get(i).copied()?; + let b = slice.get(i).copied().unwrap_or(0); Some(a & !b) }) } } +struct InNoneExceptIter<'a> { + next_chunk_index: usize, + in_none: &'a [&'a [u32]], + except: &'a [u32], +} +impl<'a> InNoneExceptIter<'a> { + fn new(in_none: &'a [&'a [u32]], except: &'a [u32]) -> Self { + Self { in_none, except, next_chunk_index: 0 } + } +} +impl<'a> Iterator for InNoneExceptIter<'a> { + type Item = u32; + fn next(&mut self) -> Option { + let i = self.next_chunk_index; + self.next_chunk_index += 1; + let init = self.except.get(i).copied()?; + Some(self.in_none.iter().fold(init, |folding, slice| { + let a = folding; + let b = slice.get(i).copied().unwrap_or(0); + a & !b + })) + } +} + /* The idea is we have a set of component machines that fork whenever they reflect on the oracle to make concrete their predicates. their speculative execution procedure BLOCKS whenever they reflect on the contents of a message that has not yet arrived.