Files @ 75e767adaaf7
Branch filter:

Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs - annotation

75e767adaaf7 4.9 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Add a new RawArray implementation

Needed to implement ringbuffer queues in a neat way. Will deprecate the
old RawVec implementation
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
use std::sync::atomic::{AtomicU32, Ordering};

use super::unfair_se_lock::{UnfairSeLock};

/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T: 'static> {
    // Entire contents are boxed up such that we can create producers that have
    // a pointer to the contents.
    inner: Box<Shared<T>>
}

// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}

/// Shared data between queue consumer and the queue producers
struct Shared<T: 'static> {
    data: UnfairSeLock<Inner<T>>,
    read_head: u32,
    write_head: AtomicU32,
    limit_head: AtomicU32,
    #[cfg(debug_assertions)] dbg: AtomicU32,
}

/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
    data: Vec<T>,
    compare_mask: u32,
    read_mask: u32,
}

impl<T> QueueDynMpsc<T> {
    /// Constructs a new MPSC queue. Note that the initial capacity is always
    /// increased to the next power of 2 (if it isn't already).
    pub fn new(initial_capacity: usize) -> Self {
        let initial_capacity = initial_capacity.next_power_of_two();
        Self::assert_correct_size(initial_capacity);

        return Self{
            inner: Box::new(Shared {
                data: UnfairSeLock::new(Inner{
                    data: Vec::with_capacity(initial_capacity),
                    compare_mask: (2 * initial_capacity as u32) - 1,
                    read_mask: initial_capacity as u32 - 1,
                }),
                read_head: 0,
                write_head: AtomicU32::new(0),
                limit_head: AtomicU32::new(0),
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
            }),
        };
    }

    #[inline]
    pub fn producer(&self) -> QueueDynProducer<T> {
        return QueueDynProducer::new(self);
    }

    /// Perform an attempted read from the queue. It might be that some producer
    /// is putting something in the queue while this function is executing, and
    /// we don't get the consume it.
    pub fn read(&mut self) -> Option<T> {
        let cur_read = self.inner.read_head;
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
        let data_lock = self.inner.data.lock_shared();

        if cur_read != cur_limit {
            // Make a bitwise copy of the value and return it. The receiver is
            // responsible for dropping it.
            unsafe {
                let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
                self.inner.read_head += 1;
                return Some(std::ptr::read(source));
            }
        } else {
            return None;
        }
    }

    #[inline]
    fn assert_correct_size(capacity: usize) {
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
    }
}

impl<T> Drop for QueueDynMpsc<T> {
    fn drop(&mut self) {
        // There should be no more `QueueDynProducer` pointers to this queue
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
        // And so the limit head should be equal to the write head
        let write_index = self.inner.write_head.load(Ordering::Acquire);
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);

        // Every item that has not yet been taken out of the queue needs to
        // have
        while self.inner.read_head != write_index {

        }
    }
}

pub struct QueueDynProducer<T: 'static> {
    queue: &'static Shared<T>,
}

impl<T> QueueDynProducer<T> {
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
        unsafe {
            // If you only knew the power of the dark side! Obi-Wan never told
            // you what happened to your father!
            let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
            return Self{ queue };
        }
    }
}

impl<T> Drop for QueueDynProducer<T> {
    fn drop(&mut self) {
        dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
    }
}

// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}