Changeset - 64ef56fc060f
[Not reviewed]
0 1 0
MH - 3 years ago 2022-01-06 15:21:00
contact@maxhenger.nl
WIP: Work on MPSC queue, pending bugfixes

Works in single threaded fashion, some issues in multithreaded
stresstest require fixing.
1 file changed with 259 insertions and 17 deletions:
0 comments (0 inline, 0 general)
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -48,7 +48,7 @@ impl<T> QueueDynMpsc<T> {
 
    /// increased to the next power of 2 (if it isn't already).
 
    pub fn new(initial_capacity: usize) -> Self {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        Self::assert_correct_size(initial_capacity);
 
        assert_correct_capacity(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 
@@ -82,7 +82,7 @@ impl<T> QueueDynMpsc<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap();
 
        let buf_size = data_lock.data.cap() as u32;
 

	
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
@@ -98,11 +98,6 @@ impl<T> QueueDynMpsc<T> {
 
            return None;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn assert_correct_size(capacity: usize) {
 
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
@@ -118,7 +113,7 @@ impl<T> Drop for QueueDynMpsc<T> {
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap();
 
        read_index += data_lock.data.cap() as u32;
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
@@ -147,11 +142,40 @@ impl<T> QueueDynProducer<T> {
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let data_lock = queue.data.lock_shared();
 
        let read_index = queue.read_head.load(Ordering::Acquire);
 
        let write_index = queue.write_head.load(Ordering::Acquire);
 
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
            let expected_capacity = data_lock.data.cap();
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
                data_lock = self.resize(data_lock, expected_capacity);
 
                write_index = queue.write_head.load(Ordering::Acquire);
 
                continue 'attempt_write;
 
            }
 

	
 
            // If here try to advance write index
 
            let new_write_index = (write_index + 1) & data_lock.compare_mask;
 
            if let Err(actual_write_index) = queue.write_head.compare_exchange(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire
 
            ) {
 
                write_index = actual_write_index;
 
                continue 'attempt_write;
 
            }
 

	
 
            // We're now allowed to write at `write_index`
 
            unsafe {
 
                std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value);
 
            }
 

	
 
            // Update limit head to let reader obtain the written value in a
 
            // CAS-loop
 
            while let Err(_) = queue.limit_head.compare_exchange_weak(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed
 
            ) {}
 

	
 
            return;
 
        }
 
    }
 

	
 
@@ -160,12 +184,13 @@ impl<T> QueueDynProducer<T> {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let exclusive_lock = self.queue.data.lock_exclusive();
 
            let mut exclusive_lock = queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 
                assert_correct_capacity(new_capacity);
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
@@ -178,8 +203,39 @@ impl<T> QueueDynProducer<T> {
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer.
 
                
 
                // underlying buffer. We have that everything between the read
 
                // index and the write index is readable. And the following
 
                // preserves that property, while increasing the size from
 
                // `old_capacity` to `new_capacity`.
 
                // Note that the addition of `new_capacity` to `write_head` is
 
                // to ensure the ringbuffer can distinguish the cases where the
 
                // ringbuffer is full, and when it is empty.
 
                let mut read_index = queue.read_head.load(Ordering::Acquire);
 
                let mut write_index = queue.write_head.load(Ordering::Acquire);
 
                debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access
 

	
 
                read_index &= exclusive_lock.read_mask;
 
                write_index &= exclusive_lock.read_mask;
 

	
 
                let new_capacity = new_capacity as u32;
 
                if read_index < write_index {
 
                    // The readable elements do not wrap around the ringbuffer
 
                    write_index += new_capacity;
 
                } else {
 
                    // The readable elements do wrap around the ringbuffer
 
                    write_index += old_capacity as u32;
 
                    write_index += new_capacity;
 
                }
 

	
 
                queue.read_head.store(read_index, Ordering::Release);
 
                queue.limit_head.store(write_index, Ordering::Release);
 
                queue.write_head.store(write_index, Ordering::Release);
 

	
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 

	
 
                println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity)
 
            }
 
        }
 

	
 
@@ -197,4 +253,190 @@ impl<T> Drop for QueueDynProducer<T> {
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 
\ No newline at end of file
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 

	
 
#[inline]
 
fn assert_correct_capacity(capacity: usize) {
 
    assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 

	
 
    fn queue_size<T>(queue: &QueueDynMpsc<T>) -> usize {
 
        let lock = queue.inner.data.lock_exclusive();
 
        return lock.data.cap();
 
    }
 

	
 
    #[test]
 
    fn single_threaded_fixed_size_push_pop() {
 
        const INIT_SIZE: usize = 16;
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let prod = cons.producer();
 

	
 
        for _round in 0..3 {
 
            // Fill up with indices
 
            for idx in 0..INIT_SIZE {
 
                prod.push(idx);
 
            }
 

	
 
            // Take out indices and check
 
            for idx in 0..INIT_SIZE {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
            }
 

	
 
            assert!(cons.pop().is_none()); // nothing left in queue
 
            assert_eq!(queue_size(&cons), INIT_SIZE); // queue still of same size
 
        }
 
    }
 

	
 
    #[test]
 
    fn single_threaded_resizing_push_pop() {
 
        const INIT_SIZE: usize = 8;
 
        const NUM_RESIZE: usize = 3; // note: each resize increases capacity by factor of two
 

	
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let prod = cons.producer();
 

	
 
        for resize_idx in 0..NUM_RESIZE {
 
            // Fill up with indices, one more than the size
 
            let cur_size = INIT_SIZE << resize_idx;
 
            let new_size = cur_size << 1;
 
            for idx in 0..new_size {
 
                prod.push(idx);
 
            }
 

	
 
            for idx in 0..new_size {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
            }
 

	
 
            assert!(cons.pop().is_none());
 
            assert_eq!(queue_size(&cons), new_size);
 
        }
 

	
 
        assert_eq!(queue_size(&cons), INIT_SIZE << NUM_RESIZE);
 
    }
 

	
 
    #[test]
 
    fn single_threaded_alternating_push_pop() {
 
        const INIT_SIZE: usize = 32;
 
        const NUM_PROD: usize = 4;
 
        assert!(INIT_SIZE % NUM_PROD == 0);
 

	
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let mut prods = Vec::with_capacity(NUM_PROD);
 
        for _ in 0..NUM_PROD {
 
            prods.push(cons.producer());
 
        }
 

	
 
        for _round_idx in 0..4 {
 
            // Fill up, alternating per producer
 
            let mut prod_idx = 0;
 
            for idx in 0..INIT_SIZE {
 
                let prod = &prods[prod_idx];
 
                prod_idx += 1;
 
                prod_idx %= NUM_PROD;
 
                prod.push(idx);
 
            }
 

	
 
            // Retrieve and check again
 
            for idx in 0..INIT_SIZE {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
            }
 

	
 
            assert!(cons.pop().is_none());
 
            assert_eq!(queue_size(&cons), INIT_SIZE);
 
        }
 
    }
 

	
 
    #[test]
 
    fn multithreaded_production_and_consumption() {
 
        use std::sync::{Arc, Mutex};
 

	
 
        // Rather randomized test. Kind of a stress test. We let the producers
 
        // produce `u64` values with the high bits containing their identifier.
 
        // The consumer will try receive as fast as possible until each thread
 
        // has produced the expected number of values.
 
        const NUM_STRESS_TESTS: usize = 1;
 
        const NUM_PER_THREAD: usize = 4096*32;
 
        const NUM_PROD_THREADS: usize = 1;
 

	
 
        fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; }
 
        fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; }
 

	
 
        // Span queue and producers
 
        for _stress_idx in 0..NUM_STRESS_TESTS {
 
            let mut queue = QueueDynMpsc::new(4);
 
            let mut producers = Vec::with_capacity(NUM_PROD_THREADS);
 
            for _idx in 0..NUM_PROD_THREADS {
 
                producers.push(queue.producer());
 
            }
 

	
 
            // Start up consume thread and let it spin immediately. Note that it
 
            // must die last.
 
            let can_exit_lock = Arc::new(Mutex::new(false));
 
            let mut held_exit_lock = can_exit_lock.lock().unwrap();
 

	
 
            let consume_handle = {
 
                let can_exit_lock = can_exit_lock.clone();
 
                std::thread::spawn(move || {
 
                    let mut counters = [0u64; NUM_PROD_THREADS];
 
                    let mut num_done = 0;
 
                    while num_done != NUM_PROD_THREADS {
 
                        // Spin until we get something
 
                        let new_value = loop {
 
                            if let Some(value) = queue.pop() {
 
                                break value;
 
                            }
 
                        };
 

	
 
                        let thread_idx = take_num_thread_idx(new_value);
 
                        let counter = &mut counters[thread_idx as usize];
 
                        assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order
 

	
 
                        *counter += 1;
 
                        if *counter == NUM_PER_THREAD as u64 {
 
                            // Finished this one
 
                            num_done += 1;
 
                        }
 
                    }
 

	
 
                    let _exit_guard = can_exit_lock.lock().unwrap();
 

	
 
                    println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD);
 
                    println!("DEBUG: Num threads        = {}", NUM_PROD_THREADS);
 
                    println!("DEBUG: Total num ops      = {}", NUM_PER_THREAD * NUM_PROD_THREADS);
 
                    println!("DEBUG: Final queue cap    = {}", queue.inner.data.lock_exclusive().data.cap());
 
                })
 
            };
 

	
 
            // Set up producer threads
 
            let mut handles = Vec::with_capacity(NUM_PROD_THREADS);
 
            for prod_idx in 0..NUM_PROD_THREADS {
 
                let prod_handle = producers.pop().unwrap();
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let base_value = (prod_idx << 32) as u64;
 
                    for number in 0..NUM_PER_THREAD as u64 {
 
                        prod_handle.push(base_value + number);
 
                    }
 
                });
 

	
 
                handles.push(handle);
 
            }
 

	
 
            // Wait until all producers finished, then we unlock our held lock and
 
            // we wait until the consumer finishes
 
            for handle in handles {
 
                handle.join().expect("clean producer exit");
 
            }
 

	
 
            drop(held_exit_lock);
 
            consume_handle.join().expect("clean consumer exit");
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)