Files @ 75e767adaaf7
Branch filter:

Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs

75e767adaaf7 4.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Add a new RawArray implementation

Needed to implement ringbuffer queues in a neat way. Will deprecate the
old RawVec implementation
use std::sync::atomic::{AtomicU32, Ordering};

use super::unfair_se_lock::{UnfairSeLock};

/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T: 'static> {
    // Entire contents are boxed up such that we can create producers that have
    // a pointer to the contents.
    inner: Box<Shared<T>>
}

// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}

/// Shared data between queue consumer and the queue producers
struct Shared<T: 'static> {
    data: UnfairSeLock<Inner<T>>,
    read_head: u32,
    write_head: AtomicU32,
    limit_head: AtomicU32,
    #[cfg(debug_assertions)] dbg: AtomicU32,
}

/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
    data: Vec<T>,
    compare_mask: u32,
    read_mask: u32,
}

impl<T> QueueDynMpsc<T> {
    /// Constructs a new MPSC queue. Note that the initial capacity is always
    /// increased to the next power of 2 (if it isn't already).
    pub fn new(initial_capacity: usize) -> Self {
        let initial_capacity = initial_capacity.next_power_of_two();
        Self::assert_correct_size(initial_capacity);

        return Self{
            inner: Box::new(Shared {
                data: UnfairSeLock::new(Inner{
                    data: Vec::with_capacity(initial_capacity),
                    compare_mask: (2 * initial_capacity as u32) - 1,
                    read_mask: initial_capacity as u32 - 1,
                }),
                read_head: 0,
                write_head: AtomicU32::new(0),
                limit_head: AtomicU32::new(0),
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
            }),
        };
    }

    #[inline]
    pub fn producer(&self) -> QueueDynProducer<T> {
        return QueueDynProducer::new(self);
    }

    /// Perform an attempted read from the queue. It might be that some producer
    /// is putting something in the queue while this function is executing, and
    /// we don't get the consume it.
    pub fn read(&mut self) -> Option<T> {
        let cur_read = self.inner.read_head;
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
        let data_lock = self.inner.data.lock_shared();

        if cur_read != cur_limit {
            // Make a bitwise copy of the value and return it. The receiver is
            // responsible for dropping it.
            unsafe {
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
                self.inner.read_head += 1;
                return Some(std::ptr::read(source));
            }
        } else {
            return None;
        }
    }

    #[inline]
    fn assert_correct_size(capacity: usize) {
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
    }
}

impl<T> Drop for QueueDynMpsc<T> {
    fn drop(&mut self) {
        // There should be no more `QueueDynProducer` pointers to this queue
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
        // And so the limit head should be equal to the write head
        let write_index = self.inner.write_head.load(Ordering::Acquire);
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);

        // Every item that has not yet been taken out of the queue needs to
        // have
        while self.inner.read_head != write_index {

        }
    }
}

pub struct QueueDynProducer<T: 'static> {
    queue: &'static Shared<T>,
}

impl<T> QueueDynProducer<T> {
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
        unsafe {
            // If you only knew the power of the dark side! Obi-Wan never told
            // you what happened to your father!
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
            return Self{ queue };
        }
    }
}

impl<T> Drop for QueueDynProducer<T> {
    fn drop(&mut self) {
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
    }
}

// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}