use std::sync::atomic::{AtomicU32, Ordering}; use crate::collections::RawArray; use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the /// consumer. To retrieve access to the producer-side: call `producer()`. /// /// This is a queue that will resize (indefinitely) if it becomes full, and will /// not shrink. So probably a temporary thing. /// /// In debug mode we'll make sure that there are no producers when the queue is /// dropped. We don't do this in release mode because the runtime is written /// such that components always remain alive (hence, this queue will remain /// accessible) while there are references to it. // NOTE: Addendum to the above remark, not true if the thread owning the // consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do // I want to take that into account? pub struct QueueDynMpsc { // Entire contents are boxed up such that we can create producers that have // a pointer to the contents. inner: Box> } // One may move around the queue between threads, as long as there is only one // instance of it. unsafe impl Send for QueueDynMpsc{} /// Shared data between queue consumer and the queue producers struct Shared { data: UnfairSeLock>, read_head: AtomicU32, write_head: AtomicU32, limit_head: AtomicU32, #[cfg(debug_assertions)] dbg: AtomicU32, } /// Locked by an exclusive/shared lock. Exclusive lock is obtained when the /// inner data array is resized. struct Inner { data: RawArray, compare_mask: u32, read_mask: u32, } type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; impl QueueDynMpsc { /// Constructs a new MPSC queue. Note that the initial capacity is always /// increased to the next power of 2 (if it isn't already). pub fn new(initial_capacity: usize) -> Self { let initial_capacity = initial_capacity.next_power_of_two(); assert_correct_capacity(initial_capacity); let mut data = RawArray::new(); data.resize(initial_capacity); let initial_capacity = initial_capacity as u32; return Self{ inner: Box::new(Shared { data: UnfairSeLock::new(Inner{ data, compare_mask: (2 * initial_capacity) - 1, read_mask: initial_capacity - 1, }), read_head: AtomicU32::new(0), write_head: AtomicU32::new(initial_capacity), limit_head: AtomicU32::new(initial_capacity), #[cfg(debug_assertions)] dbg: AtomicU32::new(0), }), }; } #[inline] pub fn producer(&self) -> QueueDynProducer { return QueueDynProducer::new(self); } /// Perform an attempted read from the queue. It might be that some producer /// is putting something in the queue while this function is executing, and /// we don't get the consume it. pub fn pop(&mut self) -> Option { let data_lock = self.inner.data.lock_shared(); let cur_read = self.inner.read_head.load(Ordering::Acquire); let cur_limit = self.inner.limit_head.load(Ordering::Acquire); let buf_size = data_lock.data.cap() as u32; if (cur_read + buf_size) & data_lock.compare_mask != cur_limit { // Make a bitwise copy of the value and return it. The receiver is // responsible for dropping it. unsafe { let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize); let value = std::ptr::read(source); // We can perform a store since we're the only ones modifying // the atomic. self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release); return Some(value); } } else { return None; } } } impl Drop for QueueDynMpsc { fn drop(&mut self) { // There should be no more `QueueDynProducer` pointers to this queue dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0)); // And so the limit head should be equal to the write head let data_lock = self.inner.data.lock_shared(); let write_index = self.inner.write_head.load(Ordering::Acquire); assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index); // Every item that has not yet been taken out of the queue needs to // have its destructor called. We immediately apply the // increment-by-size trick and wait until we've hit the write head. let mut read_index = self.inner.read_head.load(Ordering::Acquire); read_index += data_lock.data.cap() as u32; while read_index & data_lock.compare_mask != write_index { unsafe { let target = data_lock.data.get((read_index & data_lock.read_mask) as usize); std::ptr::drop_in_place(target); } read_index += 1; } } } pub struct QueueDynProducer { queue: *const Shared, } impl QueueDynProducer { fn new(consumer: &QueueDynMpsc) -> Self { dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); unsafe { // If you only knew the power of the dark side! Obi-Wan never told // you what happened to your father! let queue: *const _ = std::mem::transmute(consumer.inner.as_ref()); return Self{ queue }; } } pub fn push(&self, value: T) { let queue = unsafe{ &*self.queue }; let mut data_lock = queue.data.lock_shared(); let mut write_index = queue.write_head.load(Ordering::Acquire); 'attempt_write: loop { let read_index = queue.read_head.load(Ordering::Acquire); if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing // Need to resize, try loading read/write index afterwards let expected_capacity = data_lock.data.cap(); data_lock = self.resize(data_lock, expected_capacity); write_index = queue.write_head.load(Ordering::Acquire); continue 'attempt_write; } // If here try to advance write index let new_write_index = (write_index + 1) & data_lock.compare_mask; if let Err(actual_write_index) = queue.write_head.compare_exchange( write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire ) { write_index = actual_write_index; continue 'attempt_write; } // We're now allowed to write at `write_index` unsafe { std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value); } // Update limit head to let reader obtain the written value in a // CAS-loop while let Err(_) = queue.limit_head.compare_exchange_weak( write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed ) {} return; } } fn resize(&self, shared_lock: InnerRead, expected_capacity: usize) -> InnerRead { drop(shared_lock); let queue = unsafe{ &*self.queue }; { let mut exclusive_lock = queue.data.lock_exclusive(); // We hold the exclusive lock, but someone else might have done the resizing, and so: if exclusive_lock.data.cap() == expected_capacity { let old_capacity = expected_capacity; let new_capacity = 2 * old_capacity; assert_correct_capacity(new_capacity); // Resize by a factor of two, and make the two halves identical. exclusive_lock.data.resize(new_capacity); for idx in old_capacity..new_capacity { unsafe { let target = exclusive_lock.data.get(idx); let source = exclusive_lock.data.get(idx - old_capacity); std::ptr::write(target, std::ptr::read(source)); } } // Modify all atomics to reflect that we just resized the // underlying buffer. We have that everything between the read // index and the write index is readable. And the following // preserves that property, while increasing the size from // `old_capacity` to `new_capacity`. // Note that the addition of `new_capacity` to `write_head` is // to ensure the ringbuffer can distinguish the cases where the // ringbuffer is full, and when it is empty. let mut read_index = queue.read_head.load(Ordering::Acquire); let mut write_index = queue.write_head.load(Ordering::Acquire); debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access let is_full = read_index == write_index; // before bitwise AND-mask read_index &= exclusive_lock.read_mask; write_index &= exclusive_lock.read_mask; let new_capacity = new_capacity as u32; if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty // The readable elements do not wrap around the ringbuffer write_index += new_capacity; } else { // The readable elements do wrap around the ringbuffer write_index += old_capacity as u32; write_index += new_capacity; } queue.read_head.store(read_index, Ordering::Release); queue.limit_head.store(write_index, Ordering::Release); queue.write_head.store(write_index, Ordering::Release); // Update the masks exclusive_lock.read_mask = new_capacity - 1; exclusive_lock.compare_mask = (2 * new_capacity) - 1; } } // Reacquire shared lock return queue.data.lock_shared(); } } impl Drop for QueueDynProducer { fn drop(&mut self) { dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) }); } } // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that // would circumvent our atomic counter shenanigans. unsafe impl Send for QueueDynProducer{} #[inline] fn assert_correct_capacity(capacity: usize) { assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); } #[cfg(test)] mod tests { use super::*; fn queue_size(queue: &QueueDynMpsc) -> usize { let lock = queue.inner.data.lock_exclusive(); return lock.data.cap(); } #[test] fn single_threaded_fixed_size_push_pop() { const INIT_SIZE: usize = 16; let mut cons = QueueDynMpsc::new(INIT_SIZE); let prod = cons.producer(); for _round in 0..3 { // Fill up with indices for idx in 0..INIT_SIZE { prod.push(idx); } // Take out indices and check for idx in 0..INIT_SIZE { let gotten = cons.pop().unwrap(); assert_eq!(idx, gotten); } assert!(cons.pop().is_none()); // nothing left in queue assert_eq!(queue_size(&cons), INIT_SIZE); // queue still of same size } } #[test] fn single_threaded_resizing_push_pop() { const INIT_SIZE: usize = 8; const NUM_RESIZE: usize = 3; // note: each resize increases capacity by factor of two let mut cons = QueueDynMpsc::new(INIT_SIZE); let prod = cons.producer(); for resize_idx in 0..NUM_RESIZE { // Fill up with indices, one more than the size let cur_size = INIT_SIZE << resize_idx; let new_size = cur_size << 1; for idx in 0..new_size { prod.push(idx); } for idx in 0..new_size { let gotten = cons.pop().unwrap(); assert_eq!(idx, gotten); } assert!(cons.pop().is_none()); assert_eq!(queue_size(&cons), new_size); } assert_eq!(queue_size(&cons), INIT_SIZE << NUM_RESIZE); } #[test] fn single_threaded_alternating_push_pop() { const INIT_SIZE: usize = 32; const NUM_PROD: usize = 4; assert!(INIT_SIZE % NUM_PROD == 0); let mut cons = QueueDynMpsc::new(INIT_SIZE); let mut prods = Vec::with_capacity(NUM_PROD); for _ in 0..NUM_PROD { prods.push(cons.producer()); } for _round_idx in 0..4 { // Fill up, alternating per producer let mut prod_idx = 0; for idx in 0..INIT_SIZE { let prod = &prods[prod_idx]; prod_idx += 1; prod_idx %= NUM_PROD; prod.push(idx); } // Retrieve and check again for idx in 0..INIT_SIZE { let gotten = cons.pop().unwrap(); assert_eq!(idx, gotten); } assert!(cons.pop().is_none()); assert_eq!(queue_size(&cons), INIT_SIZE); } } #[test] fn multithreaded_production_and_consumption() { use std::sync::{Arc, Mutex}; // Rather randomized test. Kind of a stress test. We let the producers // produce `u64` values with the high bits containing their identifier. // The consumer will try receive as fast as possible until each thread // has produced the expected number of values. const NUM_STRESS_TESTS: usize = 2; const NUM_PER_THREAD: usize = 4096; const NUM_PROD_THREADS: usize = 4; fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; } fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; } // Span queue and producers for _stress_idx in 0..NUM_STRESS_TESTS { let mut queue = QueueDynMpsc::new(4); let mut producers = Vec::with_capacity(NUM_PROD_THREADS); for _idx in 0..NUM_PROD_THREADS { producers.push(queue.producer()); } // Start up consume thread and let it spin immediately. Note that it // must die last. let can_exit_lock = Arc::new(Mutex::new(false)); let mut held_exit_lock = can_exit_lock.lock().unwrap(); let consume_handle = { let can_exit_lock = can_exit_lock.clone(); std::thread::spawn(move || { let mut counters = [0u64; NUM_PROD_THREADS]; let mut num_done = 0; while num_done != NUM_PROD_THREADS { // Spin until we get something let new_value = loop { if let Some(value) = queue.pop() { break value; } }; let thread_idx = take_num_thread_idx(new_value); let counter = &mut counters[thread_idx as usize]; assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order *counter += 1; if *counter == NUM_PER_THREAD as u64 { // Finished this one num_done += 1; } } let _exit_guard = can_exit_lock.lock().unwrap(); }) }; // Set up producer threads let mut handles = Vec::with_capacity(NUM_PROD_THREADS); for prod_idx in 0..NUM_PROD_THREADS { let prod_handle = producers.pop().unwrap(); let handle = std::thread::spawn(move || { let base_value = (prod_idx as u64) << 32; for number in 0..NUM_PER_THREAD as u64 { prod_handle.push(base_value + number); } }); handles.push(handle); } // Wait until all producers finished, then we unlock our held lock and // we wait until the consumer finishes for handle in handles { handle.join().expect("clean producer exit"); } drop(held_exit_lock); consume_handle.join().expect("clean consumer exit"); } } }