Changeset - 9b5ea2f879a4
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-06 21:19:48
contact@maxhenger.nl
Implement MPSC queue

Multiple producer, single consumer queue with the purpose of acting
as the inbox for components.
2 files changed with 16 insertions and 16 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_array.rs
Show inline comments
 
@@ -175,7 +175,7 @@ mod tests {
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
        for size in sizes.iter().copied() {
 
            array.resize(size);
 
            assert_eq!(array.cap(), size);
 
        }
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -14,6 +14,9 @@ use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
// NOTE: Addendum to the above remark, not true if the thread owning the
 
// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do
 
// I want to take that into account?
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
@@ -89,10 +92,11 @@ impl<T> QueueDynMpsc<T> {
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                let value = std::ptr::read(source);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
                return Some(value);
 
            }
 
        } else {
 
            return None;
 
@@ -143,10 +147,11 @@ impl<T> QueueDynProducer<T> {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 

	
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
@@ -172,7 +177,8 @@ impl<T> QueueDynProducer<T> {
 
            // Update limit head to let reader obtain the written value in a
 
            // CAS-loop
 
            while let Err(_) = queue.limit_head.compare_exchange_weak(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed
 
                write_index, new_write_index,
 
                Ordering::AcqRel, Ordering::Relaxed
 
            ) {}
 

	
 
            return;
 
@@ -214,11 +220,12 @@ impl<T> QueueDynProducer<T> {
 
                let mut write_index = queue.write_head.load(Ordering::Acquire);
 
                debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access
 

	
 
                let is_full = read_index == write_index; // before bitwise AND-mask
 
                read_index &= exclusive_lock.read_mask;
 
                write_index &= exclusive_lock.read_mask;
 

	
 
                let new_capacity = new_capacity as u32;
 
                if read_index < write_index {
 
                if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty
 
                    // The readable elements do not wrap around the ringbuffer
 
                    write_index += new_capacity;
 
                } else {
 
@@ -234,8 +241,6 @@ impl<T> QueueDynProducer<T> {
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 

	
 
                println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity)
 
            }
 
        }
 

	
 
@@ -361,9 +366,9 @@ mod tests {
 
        // produce `u64` values with the high bits containing their identifier.
 
        // The consumer will try receive as fast as possible until each thread
 
        // has produced the expected number of values.
 
        const NUM_STRESS_TESTS: usize = 1;
 
        const NUM_PER_THREAD: usize = 4096*32;
 
        const NUM_PROD_THREADS: usize = 1;
 
        const NUM_STRESS_TESTS: usize = 2;
 
        const NUM_PER_THREAD: usize = 4096;
 
        const NUM_PROD_THREADS: usize = 4;
 

	
 
        fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; }
 
        fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; }
 
@@ -406,11 +411,6 @@ mod tests {
 
                    }
 

	
 
                    let _exit_guard = can_exit_lock.lock().unwrap();
 

	
 
                    println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD);
 
                    println!("DEBUG: Num threads        = {}", NUM_PROD_THREADS);
 
                    println!("DEBUG: Total num ops      = {}", NUM_PER_THREAD * NUM_PROD_THREADS);
 
                    println!("DEBUG: Final queue cap    = {}", queue.inner.data.lock_exclusive().data.cap());
 
                })
 
            };
 

	
 
@@ -420,7 +420,7 @@ mod tests {
 
                let prod_handle = producers.pop().unwrap();
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let base_value = (prod_idx << 32) as u64;
 
                    let base_value = (prod_idx as u64) << 32;
 
                    for number in 0..NUM_PER_THREAD as u64 {
 
                        prod_handle.push(base_value + number);
 
                    }
0 comments (0 inline, 0 general)