use std::sync::atomic::{AtomicU32, Ordering}; use super::unfair_se_lock::{UnfairSeLock}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the /// consumer. To retrieve access to the producer-side: call `producer()`. /// /// This is a queue that will resize (indefinitely) if it becomes full, and will /// not shrink. So probably a temporary thing. /// /// In debug mode we'll make sure that there are no producers when the queue is /// dropped. We don't do this in release mode because the runtime is written /// such that components always remain alive (hence, this queue will remain /// accessible) while there are references to it. pub struct QueueDynMpsc { // Entire contents are boxed up such that we can create producers that have // a pointer to the contents. inner: Box> } // One may move around the queue between threads, as long as there is only one // instance of it. unsafe impl Send for QueueDynMpsc{} /// Shared data between queue consumer and the queue producers struct Shared { data: UnfairSeLock>, read_head: u32, write_head: AtomicU32, limit_head: AtomicU32, #[cfg(debug_assertions)] dbg: AtomicU32, } /// Locked by an exclusive/shared lock. Exclusive lock is obtained when the /// inner data array is resized. struct Inner { data: Vec, compare_mask: u32, read_mask: u32, } impl QueueDynMpsc { /// Constructs a new MPSC queue. Note that the initial capacity is always /// increased to the next power of 2 (if it isn't already). pub fn new(initial_capacity: usize) -> Self { let initial_capacity = initial_capacity.next_power_of_two(); Self::assert_correct_size(initial_capacity); return Self{ inner: Box::new(Shared { data: UnfairSeLock::new(Inner{ data: Vec::with_capacity(initial_capacity), compare_mask: (2 * initial_capacity as u32) - 1, read_mask: initial_capacity as u32 - 1, }), read_head: 0, write_head: AtomicU32::new(0), limit_head: AtomicU32::new(0), #[cfg(debug_assertions)] dbg: AtomicU32::new(0), }), }; } #[inline] pub fn producer(&self) -> QueueDynProducer { return QueueDynProducer::new(self); } /// Perform an attempted read from the queue. It might be that some producer /// is putting something in the queue while this function is executing, and /// we don't get the consume it. pub fn read(&mut self) -> Option { let cur_read = self.inner.read_head; let cur_limit = self.inner.limit_head.load(Ordering::Acquire); let data_lock = self.inner.data.lock_shared(); if cur_read != cur_limit { // Make a bitwise copy of the value and return it. The receiver is // responsible for dropping it. unsafe { let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize); self.inner.read_head += 1; return Some(std::ptr::read(source)); } } else { return None; } } #[inline] fn assert_correct_size(capacity: usize) { assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); } } impl Drop for QueueDynMpsc { fn drop(&mut self) { // There should be no more `QueueDynProducer` pointers to this queue dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0)); // And so the limit head should be equal to the write head let write_index = self.inner.write_head.load(Ordering::Acquire); assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index); // Every item that has not yet been taken out of the queue needs to // have while self.inner.read_head != write_index { } } } pub struct QueueDynProducer { queue: &'static Shared, } impl QueueDynProducer { fn new(consumer: &QueueDynMpsc) -> Self { dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); unsafe { // If you only knew the power of the dark side! Obi-Wan never told // you what happened to your father! let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref()); return Self{ queue }; } } } impl Drop for QueueDynProducer { fn drop(&mut self) { dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel)); } } // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that // would circumvent our atomic counter shenanigans. unsafe impl Send for QueueDynProducer{}