Changeset - a5ae8bee6918
[Not reviewed]
0 1 0
mh - 4 years ago 2021-12-23 10:32:34
contact@maxhenger.nl
Slight extension of component store
1 file changed with 40 insertions and 14 deletions:
0 comments (0 inline, 0 general)
src/runtime2/runtime.rs
Show inline comments
 
@@ -88,25 +88,35 @@ impl Runtime {
 
/// `&T` will only be used to access the "public" fields immutably.
 
struct CompStore<T: Sized> {
 
    freelist: *mut u32,
 
    data: *mut *mut T,
 
    count: usize,
 
    mask: usize,
 
    byte_size: usize, // used for dealloc
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
    read_head: AtomicUsize,
 
}
 

	
 
const fn compute_realloc_flag() -> usize {
 
    match size_of::<usize>() {
 
        4 => return 1 << 31, // 32-bit system
 
        8 => return 1 << 63, // 64-bit system
 
        _ => panic!("unexpected byte size for 'usize'")
 
    }
 
}
 

	
 
impl<T: Sized> CompStore<T> {
 
    const REALLOC_FLAG: usize = compute_realloc_flag();
 

	
 
    fn new(initial_count: usize) -> Self {
 
        // Allocate data
 
        debug_assert!(size_of::<T>() > 0); // No ZST during testing (and definitely not in production)
 
        let (freelist, data, byte_size) = Self::alloc_buffer(initial_count);
 

	
 
        unsafe {
 
            // Init the freelist to all of the indices in the array of data
 
            let mut target = freelist;
 
            for idx in 0..initial_count as u32 {
 
                *target = idx;
 
                target += 1;
 
            }
 
@@ -119,101 +129,117 @@ impl<T: Sized> CompStore<T> {
 
            freelist, data,
 
            count: initial_count,
 
            mask: initial_count - 1,
 
            byte_size,
 
            write_head: AtomicUsize::new(initial_count),
 
            limit_head: AtomicUsize::new(initial_count),
 
            read_head: AtomicUsize::new(0),
 
        };
 
    }
 

	
 
    fn get_index_from_freelist(&self) -> u32 {
 
        let compare_mask = (self.count * 2) - 1;
 
        let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
 

	
 
        'try_loop: loop {
 
            let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
 
            let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second
 

	
 
            // By definition we always have `read_index <= limit_index` (if we would
 
            // have an infinite buffer, in reality we will wrap).
 
            if (read_index & compare_mask) == (limit_index & compare_mask) {
 
                // We need to create a bigger buffer. Note that no reader can
 
                // *ever* set the read index to beyond the limit index, and it
 
                // is currently equal. So we're certain that there is no other
 
                // reader currently updating the read_head.
 
                //
 
                // To test if we are supposed to resize the backing buffer we
 
                // try to increment the limit index by 2*count. Note that the
 
                // try set the REALLOC_FLAG on the limit index. Note that the
 
                // stored indices are always in the range [0, 2*count). So if
 
                // we add 2*count to the limit index, then the masked condition
 
                // above still holds! Other potential readers will end up here
 
                // and are allowed to wait until we resized the backing
 
                // we add REALLOC_FLAG to the limit index, then the masked
 
                // condition above still holds! Other potential readers will end
 
                // up here and are allowed to wait until we resized the backing
 
                // container.
 
                //
 
                // Furthermore, setting the limit index to this high value also
 
                // notifies the writer that any of it writes should be tried
 
                // again, as they're writing to a buffer that is going to get
 
                // trashed.
 
                todo!("finish reallocation code");
 
                match self.limit_head.compare_exchange(limit_index, limit_index + 2*self.count, Ordering::SeqCst, Ordering::Acquire) {
 
                match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => {
 
                        // Limit index has changed, so we're now the ones that
 
                        // are supposed to resize the
 
                    }
 
                }
 
            } else {
 
                // It seems we have space to read
 
                let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) };
 
                if self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_err() {
 
                if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
                    // Failed to do the CAS, try again. We need to start at the
 
                    // start again because we might have had other readers that
 
                    // were successful, so at the very least, the preemptive
 
                    // read we did is no longer correct.
 
                    read_index = new_read_index;
 
                    continue 'try_loop;
 
                }
 

	
 
                // We now "own" the value at the read index
 
                return preemptive_read;
 
            }
 
        }
 
    }
 

	
 
    fn put_back_index_into_freelist(&self, index: u32) {
 
        let compare_mask = (self.count * 2) - 1;
 
        'try_loop: loop {
 
            let write_index = self.write_head.load(Ordering::Acquire);
 
            while !self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_ok() {
 
        let mut compare_mask = (self.count * 2) - 1;
 
        let mut write_index = self.write_head.load(Ordering::Acquire);
 
        while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
            // Failed to do the CAS, try again
 
                continue 'try_loop
 
            write_index = new_write_index;
 
        }
 

	
 
        'try_write_loop: loop {
 
            // We are now the only ones that can write at `write_index`. Try to
 
            // do so
 
            unsafe { *self.freelist.add(write_index & self.mask) = index; }
 

	
 
            // But we still need to move the limit head. Only succesful writers
 
            // may move it so we expect it to move from the `write_index` to
 
            // `write_index + 1`, but we might have to spin to achieve it.
 
            // Furthermore, the `limit_head` is used by the index-retrieval
 
            // function to indicate that a read is in progress.
 
            loop {
 
                todo!("finish reallocation code");
 
            'commit_to_write_loop: loop {
 
                match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => break,
 
                    Err(new_value) => {
 
                        // Two options: the limit is not yet what we expect it
 
                        // to be. If so, just try again with the old values.
 
                        // But if it is very large (relatively) then this is the
 
                        // signal from the reader that the entire storage is
 
                        // being resized
 
                        if new_value & Self::REALLOC_FLAG != 0 {
 
                            // Someone is resizing, wait until that is no longer
 
                            // true.
 
                            while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 {
 
                                // still resizing
 
                            }
 

	
 
                            // Not resizing anymore, try everything again, our
 
                            // old write has now become invalid. But our index
 
                            // hasn't! So we need to finish our write and our
 
                            // increment of the limit head
 
                            continue 'try_write_loop;
 
                        } else {
 
                            // Just try again
 
                            continue 'commit_to_write_loop;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // We updated the limit head, so we're done :)
 
            return;
 
        }
 
    }
 

	
 
    /// Retrieves a `&T` from the store. This should be retrieved using `create`
 
    /// and not yet given back by calling `destroy`.
 
    fn get(&self, index: u32) -> &T {
0 comments (0 inline, 0 general)