Changeset - 9b5ea2f879a4
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-06 21:19:48
contact@maxhenger.nl
Implement MPSC queue

Multiple producer, single consumer queue with the purpose of acting
as the inbox for components.
2 files changed with 16 insertions and 16 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_array.rs
Show inline comments
 
@@ -154,31 +154,31 @@ mod tests {
 
        check(&array, INIT_SIZE);
 
    }
 

	
 
    #[test]
 
    fn decrease_size() {
 
        const INIT_SIZE: usize = 16;
 
        const FINAL_SIZE: usize = 8;
 

	
 
        let mut array = RawArray::new();
 
        array.resize(INIT_SIZE);
 
        fill(&mut array, INIT_SIZE);
 
        array.resize(FINAL_SIZE);
 
        check(&array, FINAL_SIZE);
 
    }
 

	
 
    #[test]
 
    fn increase_and_decrease_size() {
 
        let sizes = [12, 8, 6, 150, 128, 32, 16, 90, 4, 18, 27];
 
        let min_size = *sizes.iter().min().unwrap();
 
        let max_size = *sizes.iter().max().unwrap();
 

	
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
        for size in sizes.iter().copied() {
 
            array.resize(size);
 
            assert_eq!(array.cap(), size);
 
        }
 
        check(&array, min_size);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
// NOTE: Addendum to the above remark, not true if the thread owning the
 
// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do
 
// I want to take that into account?
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicU32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
}
 

	
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: RawArray<T>,
 
    compare_mask: u32,
 
@@ -68,52 +71,53 @@ impl<T> QueueDynMpsc<T> {
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 

	
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                let value = std::ptr::read(source);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
                return Some(value);
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
    fn drop(&mut self) {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap() as u32;
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
@@ -122,141 +126,142 @@ impl<T> Drop for QueueDynMpsc<T> {
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 

	
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
                data_lock = self.resize(data_lock, expected_capacity);
 
                write_index = queue.write_head.load(Ordering::Acquire);
 
                continue 'attempt_write;
 
            }
 

	
 
            // If here try to advance write index
 
            let new_write_index = (write_index + 1) & data_lock.compare_mask;
 
            if let Err(actual_write_index) = queue.write_head.compare_exchange(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire
 
            ) {
 
                write_index = actual_write_index;
 
                continue 'attempt_write;
 
            }
 

	
 
            // We're now allowed to write at `write_index`
 
            unsafe {
 
                std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value);
 
            }
 

	
 
            // Update limit head to let reader obtain the written value in a
 
            // CAS-loop
 
            while let Err(_) = queue.limit_head.compare_exchange_weak(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed
 
                write_index, new_write_index,
 
                Ordering::AcqRel, Ordering::Relaxed
 
            ) {}
 

	
 
            return;
 
        }
 
    }
 

	
 
    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
 
        drop(shared_lock);
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let mut exclusive_lock = queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 
                assert_correct_capacity(new_capacity);
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
                for idx in old_capacity..new_capacity {
 
                    unsafe {
 
                        let target = exclusive_lock.data.get(idx);
 
                        let source = exclusive_lock.data.get(idx - old_capacity);
 
                        std::ptr::write(target, std::ptr::read(source));
 
                    }
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer. We have that everything between the read
 
                // index and the write index is readable. And the following
 
                // preserves that property, while increasing the size from
 
                // `old_capacity` to `new_capacity`.
 
                // Note that the addition of `new_capacity` to `write_head` is
 
                // to ensure the ringbuffer can distinguish the cases where the
 
                // ringbuffer is full, and when it is empty.
 
                let mut read_index = queue.read_head.load(Ordering::Acquire);
 
                let mut write_index = queue.write_head.load(Ordering::Acquire);
 
                debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access
 

	
 
                let is_full = read_index == write_index; // before bitwise AND-mask
 
                read_index &= exclusive_lock.read_mask;
 
                write_index &= exclusive_lock.read_mask;
 

	
 
                let new_capacity = new_capacity as u32;
 
                if read_index < write_index {
 
                if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty
 
                    // The readable elements do not wrap around the ringbuffer
 
                    write_index += new_capacity;
 
                } else {
 
                    // The readable elements do wrap around the ringbuffer
 
                    write_index += old_capacity as u32;
 
                    write_index += new_capacity;
 
                }
 

	
 
                queue.read_head.store(read_index, Ordering::Release);
 
                queue.limit_head.store(write_index, Ordering::Release);
 
                queue.write_head.store(write_index, Ordering::Release);
 

	
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 

	
 
                println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity)
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 

	
 
#[inline]
 
fn assert_correct_capacity(capacity: usize) {
 
    assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
}
 

	
 
@@ -340,103 +345,98 @@ mod tests {
 
                prod_idx += 1;
 
                prod_idx %= NUM_PROD;
 
                prod.push(idx);
 
            }
 

	
 
            // Retrieve and check again
 
            for idx in 0..INIT_SIZE {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
            }
 

	
 
            assert!(cons.pop().is_none());
 
            assert_eq!(queue_size(&cons), INIT_SIZE);
 
        }
 
    }
 

	
 
    #[test]
 
    fn multithreaded_production_and_consumption() {
 
        use std::sync::{Arc, Mutex};
 

	
 
        // Rather randomized test. Kind of a stress test. We let the producers
 
        // produce `u64` values with the high bits containing their identifier.
 
        // The consumer will try receive as fast as possible until each thread
 
        // has produced the expected number of values.
 
        const NUM_STRESS_TESTS: usize = 1;
 
        const NUM_PER_THREAD: usize = 4096*32;
 
        const NUM_PROD_THREADS: usize = 1;
 
        const NUM_STRESS_TESTS: usize = 2;
 
        const NUM_PER_THREAD: usize = 4096;
 
        const NUM_PROD_THREADS: usize = 4;
 

	
 
        fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; }
 
        fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; }
 

	
 
        // Span queue and producers
 
        for _stress_idx in 0..NUM_STRESS_TESTS {
 
            let mut queue = QueueDynMpsc::new(4);
 
            let mut producers = Vec::with_capacity(NUM_PROD_THREADS);
 
            for _idx in 0..NUM_PROD_THREADS {
 
                producers.push(queue.producer());
 
            }
 

	
 
            // Start up consume thread and let it spin immediately. Note that it
 
            // must die last.
 
            let can_exit_lock = Arc::new(Mutex::new(false));
 
            let mut held_exit_lock = can_exit_lock.lock().unwrap();
 

	
 
            let consume_handle = {
 
                let can_exit_lock = can_exit_lock.clone();
 
                std::thread::spawn(move || {
 
                    let mut counters = [0u64; NUM_PROD_THREADS];
 
                    let mut num_done = 0;
 
                    while num_done != NUM_PROD_THREADS {
 
                        // Spin until we get something
 
                        let new_value = loop {
 
                            if let Some(value) = queue.pop() {
 
                                break value;
 
                            }
 
                        };
 

	
 
                        let thread_idx = take_num_thread_idx(new_value);
 
                        let counter = &mut counters[thread_idx as usize];
 
                        assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order
 

	
 
                        *counter += 1;
 
                        if *counter == NUM_PER_THREAD as u64 {
 
                            // Finished this one
 
                            num_done += 1;
 
                        }
 
                    }
 

	
 
                    let _exit_guard = can_exit_lock.lock().unwrap();
 

	
 
                    println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD);
 
                    println!("DEBUG: Num threads        = {}", NUM_PROD_THREADS);
 
                    println!("DEBUG: Total num ops      = {}", NUM_PER_THREAD * NUM_PROD_THREADS);
 
                    println!("DEBUG: Final queue cap    = {}", queue.inner.data.lock_exclusive().data.cap());
 
                })
 
            };
 

	
 
            // Set up producer threads
 
            let mut handles = Vec::with_capacity(NUM_PROD_THREADS);
 
            for prod_idx in 0..NUM_PROD_THREADS {
 
                let prod_handle = producers.pop().unwrap();
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let base_value = (prod_idx << 32) as u64;
 
                    let base_value = (prod_idx as u64) << 32;
 
                    for number in 0..NUM_PER_THREAD as u64 {
 
                        prod_handle.push(base_value + number);
 
                    }
 
                });
 

	
 
                handles.push(handle);
 
            }
 

	
 
            // Wait until all producers finished, then we unlock our held lock and
 
            // we wait until the consumer finishes
 
            for handle in handles {
 
                handle.join().expect("clean producer exit");
 
            }
 

	
 
            drop(held_exit_lock);
 
            consume_handle.join().expect("clean consumer exit");
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)