Changeset - 9b5ea2f879a4
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-06 21:19:48
contact@maxhenger.nl
Implement MPSC queue

Multiple producer, single consumer queue with the purpose of acting
as the inbox for components.
2 files changed with 16 insertions and 16 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_array.rs
Show inline comments
 
@@ -172,13 +172,13 @@ mod tests {
 
        let min_size = *sizes.iter().min().unwrap();
 
        let max_size = *sizes.iter().max().unwrap();
 

	
 
        let mut array = RawArray::new();
 
        array.resize(max_size);
 
        fill(&mut array, max_size);
 
        for size in sizes {
 
        for size in sizes.iter().copied() {
 
            array.resize(size);
 
            assert_eq!(array.cap(), size);
 
        }
 
        check(&array, min_size);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -11,12 +11,15 @@ use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
// NOTE: Addendum to the above remark, not true if the thread owning the
 
// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do
 
// I want to take that into account?
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
@@ -86,16 +89,17 @@ impl<T> QueueDynMpsc<T> {
 

	
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                let value = std::ptr::read(source);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(std::ptr::read(source));
 
                return Some(value);
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 
}
 
@@ -140,16 +144,17 @@ impl<T> QueueDynProducer<T> {
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 

	
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
                data_lock = self.resize(data_lock, expected_capacity);
 
                write_index = queue.write_head.load(Ordering::Acquire);
 
                continue 'attempt_write;
 
@@ -169,13 +174,14 @@ impl<T> QueueDynProducer<T> {
 
                std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value);
 
            }
 

	
 
            // Update limit head to let reader obtain the written value in a
 
            // CAS-loop
 
            while let Err(_) = queue.limit_head.compare_exchange_weak(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed
 
                write_index, new_write_index,
 
                Ordering::AcqRel, Ordering::Relaxed
 
            ) {}
 

	
 
            return;
 
        }
 
    }
 

	
 
@@ -211,17 +217,18 @@ impl<T> QueueDynProducer<T> {
 
                // to ensure the ringbuffer can distinguish the cases where the
 
                // ringbuffer is full, and when it is empty.
 
                let mut read_index = queue.read_head.load(Ordering::Acquire);
 
                let mut write_index = queue.write_head.load(Ordering::Acquire);
 
                debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access
 

	
 
                let is_full = read_index == write_index; // before bitwise AND-mask
 
                read_index &= exclusive_lock.read_mask;
 
                write_index &= exclusive_lock.read_mask;
 

	
 
                let new_capacity = new_capacity as u32;
 
                if read_index < write_index {
 
                if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty
 
                    // The readable elements do not wrap around the ringbuffer
 
                    write_index += new_capacity;
 
                } else {
 
                    // The readable elements do wrap around the ringbuffer
 
                    write_index += old_capacity as u32;
 
                    write_index += new_capacity;
 
@@ -231,14 +238,12 @@ impl<T> QueueDynProducer<T> {
 
                queue.limit_head.store(write_index, Ordering::Release);
 
                queue.write_head.store(write_index, Ordering::Release);
 

	
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 

	
 
                println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity)
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
@@ -358,15 +363,15 @@ mod tests {
 
        use std::sync::{Arc, Mutex};
 

	
 
        // Rather randomized test. Kind of a stress test. We let the producers
 
        // produce `u64` values with the high bits containing their identifier.
 
        // The consumer will try receive as fast as possible until each thread
 
        // has produced the expected number of values.
 
        const NUM_STRESS_TESTS: usize = 1;
 
        const NUM_PER_THREAD: usize = 4096*32;
 
        const NUM_PROD_THREADS: usize = 1;
 
        const NUM_STRESS_TESTS: usize = 2;
 
        const NUM_PER_THREAD: usize = 4096;
 
        const NUM_PROD_THREADS: usize = 4;
 

	
 
        fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; }
 
        fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; }
 

	
 
        // Span queue and producers
 
        for _stress_idx in 0..NUM_STRESS_TESTS {
 
@@ -403,27 +408,22 @@ mod tests {
 
                            // Finished this one
 
                            num_done += 1;
 
                        }
 
                    }
 

	
 
                    let _exit_guard = can_exit_lock.lock().unwrap();
 

	
 
                    println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD);
 
                    println!("DEBUG: Num threads        = {}", NUM_PROD_THREADS);
 
                    println!("DEBUG: Total num ops      = {}", NUM_PER_THREAD * NUM_PROD_THREADS);
 
                    println!("DEBUG: Final queue cap    = {}", queue.inner.data.lock_exclusive().data.cap());
 
                })
 
            };
 

	
 
            // Set up producer threads
 
            let mut handles = Vec::with_capacity(NUM_PROD_THREADS);
 
            for prod_idx in 0..NUM_PROD_THREADS {
 
                let prod_handle = producers.pop().unwrap();
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let base_value = (prod_idx << 32) as u64;
 
                    let base_value = (prod_idx as u64) << 32;
 
                    for number in 0..NUM_PER_THREAD as u64 {
 
                        prod_handle.push(base_value + number);
 
                    }
 
                });
 

	
 
                handles.push(handle);
0 comments (0 inline, 0 general)