diff --git a/src/collections/raw_array.rs b/src/collections/raw_array.rs index 6773f70e26ca5d08a298111c3a4ef0c8ba3f41cb..9f1e67f7e01065ff611b32b6057443c8f6badd0e 100644 --- a/src/collections/raw_array.rs +++ b/src/collections/raw_array.rs @@ -175,7 +175,7 @@ mod tests { let mut array = RawArray::new(); array.resize(max_size); fill(&mut array, max_size); - for size in sizes { + for size in sizes.iter().copied() { array.resize(size); assert_eq!(array.cap(), size); } diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index f178fd36896e5c2fd3508b09eb72032f29a21292..e2aeaee7da9c36bc54896a7324d38f5f130f95ca 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -14,6 +14,9 @@ use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// dropped. We don't do this in release mode because the runtime is written /// such that components always remain alive (hence, this queue will remain /// accessible) while there are references to it. +// NOTE: Addendum to the above remark, not true if the thread owning the +// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do +// I want to take that into account? pub struct QueueDynMpsc { // Entire contents are boxed up such that we can create producers that have // a pointer to the contents. @@ -89,10 +92,11 @@ impl QueueDynMpsc { // responsible for dropping it. unsafe { let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize); + let value = std::ptr::read(source); // We can perform a store since we're the only ones modifying // the atomic. self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release); - return Some(std::ptr::read(source)); + return Some(value); } } else { return None; @@ -143,10 +147,11 @@ impl QueueDynProducer { let queue = unsafe{ &*self.queue }; let mut data_lock = queue.data.lock_shared(); - let mut write_index = queue.write_head.load(Ordering::Acquire); // note that we need to update this index in the loop + let mut write_index = queue.write_head.load(Ordering::Acquire); 'attempt_write: loop { let read_index = queue.read_head.load(Ordering::Acquire); + if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing // Need to resize, try loading read/write index afterwards let expected_capacity = data_lock.data.cap(); @@ -172,7 +177,8 @@ impl QueueDynProducer { // Update limit head to let reader obtain the written value in a // CAS-loop while let Err(_) = queue.limit_head.compare_exchange_weak( - write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed + write_index, new_write_index, + Ordering::AcqRel, Ordering::Relaxed ) {} return; @@ -214,11 +220,12 @@ impl QueueDynProducer { let mut write_index = queue.write_head.load(Ordering::Acquire); debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access + let is_full = read_index == write_index; // before bitwise AND-mask read_index &= exclusive_lock.read_mask; write_index &= exclusive_lock.read_mask; let new_capacity = new_capacity as u32; - if read_index < write_index { + if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty // The readable elements do not wrap around the ringbuffer write_index += new_capacity; } else { @@ -234,8 +241,6 @@ impl QueueDynProducer { // Update the masks exclusive_lock.read_mask = new_capacity - 1; exclusive_lock.compare_mask = (2 * new_capacity) - 1; - - println!("DEBUG: Resized from {:10} to {:10}", old_capacity, new_capacity) } } @@ -361,9 +366,9 @@ mod tests { // produce `u64` values with the high bits containing their identifier. // The consumer will try receive as fast as possible until each thread // has produced the expected number of values. - const NUM_STRESS_TESTS: usize = 1; - const NUM_PER_THREAD: usize = 4096*32; - const NUM_PROD_THREADS: usize = 1; + const NUM_STRESS_TESTS: usize = 2; + const NUM_PER_THREAD: usize = 4096; + const NUM_PROD_THREADS: usize = 4; fn take_num_thread_idx(number: u64) -> u64 { return (number >> 32) & 0xFFFFFFFF; } fn take_num(number: u64) -> u64 { return number & 0xFFFFFFFF; } @@ -406,11 +411,6 @@ mod tests { } let _exit_guard = can_exit_lock.lock().unwrap(); - - println!("DEBUG: Num ops per thread = {}", NUM_PER_THREAD); - println!("DEBUG: Num threads = {}", NUM_PROD_THREADS); - println!("DEBUG: Total num ops = {}", NUM_PER_THREAD * NUM_PROD_THREADS); - println!("DEBUG: Final queue cap = {}", queue.inner.data.lock_exclusive().data.cap()); }) }; @@ -420,7 +420,7 @@ mod tests { let prod_handle = producers.pop().unwrap(); let handle = std::thread::spawn(move || { - let base_value = (prod_idx << 32) as u64; + let base_value = (prod_idx as u64) << 32; for number in 0..NUM_PER_THREAD as u64 { prod_handle.push(base_value + number); }