diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index edcdd72dd0e56436beeebba6a621652d0e102910..f178fd36896e5c2fd3508b09eb72032f29a21292 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -48,7 +48,7 @@ impl QueueDynMpsc { /// increased to the next power of 2 (if it isn't already). pub fn new(initial_capacity: usize) -> Self { let initial_capacity = initial_capacity.next_power_of_two(); - Self::assert_correct_size(initial_capacity); + assert_correct_capacity(initial_capacity); let mut data = RawArray::new(); data.resize(initial_capacity); @@ -82,7 +82,7 @@ impl QueueDynMpsc { let data_lock = self.inner.data.lock_shared(); let cur_read = self.inner.read_head.load(Ordering::Acquire); let cur_limit = self.inner.limit_head.load(Ordering::Acquire); - let buf_size = data_lock.data.cap(); + let buf_size = data_lock.data.cap() as u32; if (cur_read + buf_size) & data_lock.compare_mask != cur_limit { // Make a bitwise copy of the value and return it. The receiver is @@ -98,11 +98,6 @@ impl QueueDynMpsc { return None; } } - - #[inline] - fn assert_correct_size(capacity: usize) { - assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); - } } impl Drop for QueueDynMpsc { @@ -118,7 +113,7 @@ impl Drop for QueueDynMpsc { // have its destructor called. We immediately apply the // increment-by-size trick and wait until we've hit the write head. let mut read_index = self.inner.read_head.load(Ordering::Acquire); - read_index += data_lock.data.cap(); + read_index += data_lock.data.cap() as u32; while read_index & data_lock.compare_mask != write_index { unsafe { let target = data_lock.data.get((read_index & data_lock.read_mask) as usize); @@ -147,11 +142,40 @@ impl QueueDynProducer { pub fn push(&self, value: T) { let queue = unsafe{ &*self.queue }; - let data_lock = queue.data.lock_shared(); - let read_index = queue.read_head.load(Ordering::Acquire); - let write_index = queue.write_head.load(Ordering::Acquire); - if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing - let expected_capacity = data_lock.data.cap(); + let mut data_lock = queue.data.lock_shared(); + let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop + + 'attempt_write: loop { + let read_index = queue.read_head.load(Ordering::Acquire); + if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing + // Need to resize, try loading read/write index afterwards + let expected_capacity = data_lock.data.cap(); + data_lock = self.resize(data_lock, expected_capacity); + write_index = queue.write_head.load(Ordering::Acquire); + continue 'attempt_write; + } + + // If here try to advance write index + let new_write_index = (write_index + 1) & data_lock.compare_mask; + if let Err(actual_write_index) = queue.write_head.compare_exchange( + write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire + ) { + write_index = actual_write_index; + continue 'attempt_write; + } + + // We're now allowed to write at `write_index` + unsafe { + std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value); + } + + // Update limit head to let reader obtain the written value in a + // CAS-loop + while let Err(_) = queue.limit_head.compare_exchange_weak( + write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed + ) {} + + return; } } @@ -160,12 +184,13 @@ impl QueueDynProducer { let queue = unsafe{ &*self.queue }; { - let exclusive_lock = self.queue.data.lock_exclusive(); + let mut exclusive_lock = queue.data.lock_exclusive(); // We hold the exclusive lock, but someone else might have done the resizing, and so: if exclusive_lock.data.cap() == expected_capacity { let old_capacity = expected_capacity; let new_capacity = 2 * old_capacity; + assert_correct_capacity(new_capacity); // Resize by a factor of two, and make the two halves identical. exclusive_lock.data.resize(new_capacity); @@ -178,8 +203,39 @@ impl QueueDynProducer { } // Modify all atomics to reflect that we just resized the - // underlying buffer. - + // underlying buffer. We have that everything between the read + // index and the write index is readable. And the following + // preserves that property, while increasing the size from + // `old_capacity` to `new_capacity`. + // Note that the addition of `new_capacity` to `write_head` is + // to ensure the ringbuffer can distinguish the cases where the + // ringbuffer is full, and when it is empty. + let mut read_index = queue.read_head.load(Ordering::Acquire); + let mut write_index = queue.write_head.load(Ordering::Acquire); + debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access + + read_index &= exclusive_lock.read_mask; + write_index &= exclusive_lock.read_mask; + + let new_capacity = new_capacity as u32; + if read_index < write_index { + // The readable elements do not wrap around the ringbuffer + write_index += new_capacity; + } else { + // The readable elements do wrap around the ringbuffer + write_index += old_capacity as u32; + write_index += new_capacity; + } + + queue.read_head.store(read_index, Ordering::Release); + queue.limit_head.store(write_index, Ordering::Release); + queue.write_head.store(write_index, Ordering::Release); + + // Update the masks + exclusive_lock.read_mask = new_capacity - 1; + exclusive_lock.compare_mask = (2 * new_capacity) - 1; + + println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity) } } @@ -197,4 +253,190 @@ impl Drop for QueueDynProducer { // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that // would circumvent our atomic counter shenanigans. -unsafe impl Send for QueueDynProducer{} \ No newline at end of file +unsafe impl Send for QueueDynProducer{} + +#[inline] +fn assert_correct_capacity(capacity: usize) { + assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn queue_size(queue: &QueueDynMpsc) -> usize { + let lock = queue.inner.data.lock_exclusive(); + return lock.data.cap(); + } + + #[test] + fn single_threaded_fixed_size_push_pop() { + const INIT_SIZE: usize = 16; + let mut cons = QueueDynMpsc::new(INIT_SIZE); + let prod = cons.producer(); + + for _round in 0..3 { + // Fill up with indices + for idx in 0..INIT_SIZE { + prod.push(idx); + } + + // Take out indices and check + for idx in 0..INIT_SIZE { + let gotten = cons.pop().unwrap(); + assert_eq!(idx, gotten); + } + + assert!(cons.pop().is_none()); // nothing left in queue + assert_eq!(queue_size(&cons), INIT_SIZE); // queue still of same size + } + } + + #[test] + fn single_threaded_resizing_push_pop() { + const INIT_SIZE: usize = 8; + const NUM_RESIZE: usize = 3; // note: each resize increases capacity by factor of two + + let mut cons = QueueDynMpsc::new(INIT_SIZE); + let prod = cons.producer(); + + for resize_idx in 0..NUM_RESIZE { + // Fill up with indices, one more than the size + let cur_size = INIT_SIZE << resize_idx; + let new_size = cur_size << 1; + for idx in 0..new_size { + prod.push(idx); + } + + for idx in 0..new_size { + let gotten = cons.pop().unwrap(); + assert_eq!(idx, gotten); + } + + assert!(cons.pop().is_none()); + assert_eq!(queue_size(&cons), new_size); + } + + assert_eq!(queue_size(&cons), INIT_SIZE << NUM_RESIZE); + } + + #[test] + fn single_threaded_alternating_push_pop() { + const INIT_SIZE: usize = 32; + const NUM_PROD: usize = 4; + assert!(INIT_SIZE % NUM_PROD == 0); + + let mut cons = QueueDynMpsc::new(INIT_SIZE); + let mut prods = Vec::with_capacity(NUM_PROD); + for _ in 0..NUM_PROD { + prods.push(cons.producer()); + } + + for _round_idx in 0..4 { + // Fill up, alternating per producer + let mut prod_idx = 0; + for idx in 0..INIT_SIZE { + let prod = &prods[prod_idx]; + prod_idx += 1; + prod_idx %= NUM_PROD; + prod.push(idx); + } + + // Retrieve and check again + for idx in 0..INIT_SIZE { + let gotten = cons.pop().unwrap(); + assert_eq!(idx, gotten); + } + + assert!(cons.pop().is_none()); + assert_eq!(queue_size(&cons), INIT_SIZE); + } + } + + #[test] + fn multithreaded_production_and_consumption() { + use std::sync::{Arc, Mutex}; + + // Rather randomized test. Kind of a stress test. We let the producers + // produce `u64` values with the high bits containing their identifier. + // The consumer will try receive as fast as possible until each thread + // has produced the expected number of values. + const NUM_STRESS_TESTS: usize = 1; + const NUM_PER_THREAD: usize = 4096*32; + const NUM_PROD_THREADS: usize = 1; + + fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; } + fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; } + + // Span queue and producers + for _stress_idx in 0..NUM_STRESS_TESTS { + let mut queue = QueueDynMpsc::new(4); + let mut producers = Vec::with_capacity(NUM_PROD_THREADS); + for _idx in 0..NUM_PROD_THREADS { + producers.push(queue.producer()); + } + + // Start up consume thread and let it spin immediately. Note that it + // must die last. + let can_exit_lock = Arc::new(Mutex::new(false)); + let mut held_exit_lock = can_exit_lock.lock().unwrap(); + + let consume_handle = { + let can_exit_lock = can_exit_lock.clone(); + std::thread::spawn(move || { + let mut counters = [0u64; NUM_PROD_THREADS]; + let mut num_done = 0; + while num_done != NUM_PROD_THREADS { + // Spin until we get something + let new_value = loop { + if let Some(value) = queue.pop() { + break value; + } + }; + + let thread_idx = take_num_thread_idx(new_value); + let counter = &mut counters[thread_idx as usize]; + assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order + + *counter += 1; + if *counter == NUM_PER_THREAD as u64 { + // Finished this one + num_done += 1; + } + } + + let _exit_guard = can_exit_lock.lock().unwrap(); + + println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD); + println!("DEBUG: Num threads = {}", NUM_PROD_THREADS); + println!("DEBUG: Total num ops = {}", NUM_PER_THREAD * NUM_PROD_THREADS); + println!("DEBUG: Final queue cap = {}", queue.inner.data.lock_exclusive().data.cap()); + }) + }; + + // Set up producer threads + let mut handles = Vec::with_capacity(NUM_PROD_THREADS); + for prod_idx in 0..NUM_PROD_THREADS { + let prod_handle = producers.pop().unwrap(); + + let handle = std::thread::spawn(move || { + let base_value = (prod_idx << 32) as u64; + for number in 0..NUM_PER_THREAD as u64 { + prod_handle.push(base_value + number); + } + }); + + handles.push(handle); + } + + // Wait until all producers finished, then we unlock our held lock and + // we wait until the consumer finishes + for handle in handles { + handle.join().expect("clean producer exit"); + } + + drop(held_exit_lock); + consume_handle.join().expect("clean consumer exit"); + } + } +} \ No newline at end of file