Files @ fafdf8723ee3
Branch filter:

Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs

fafdf8723ee3 7.6 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP: Busy with MPSC channel

Will act as backing buffer for messages (for now).
use std::sync::atomic::{AtomicU32, Ordering};

use crate::collections::RawArray;
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};

/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T> {
    // Entire contents are boxed up such that we can create producers that have
    // a pointer to the contents.
    inner: Box<Shared<T>>
}

// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}

/// Shared data between queue consumer and the queue producers
struct Shared<T> {
    data: UnfairSeLock<Inner<T>>,
    read_head: AtomicU32,
    write_head: AtomicU32,
    limit_head: AtomicU32,
    #[cfg(debug_assertions)] dbg: AtomicU32,
}

/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
    data: RawArray<T>,
    compare_mask: u32,
    read_mask: u32,
}

type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;

impl<T> QueueDynMpsc<T> {
    /// Constructs a new MPSC queue. Note that the initial capacity is always
    /// increased to the next power of 2 (if it isn't already).
    pub fn new(initial_capacity: usize) -> Self {
        let initial_capacity = initial_capacity.next_power_of_two();
        Self::assert_correct_size(initial_capacity);

        let mut data = RawArray::new();
        data.resize(initial_capacity);

        let initial_capacity = initial_capacity as u32;

        return Self{
            inner: Box::new(Shared {
                data: UnfairSeLock::new(Inner{
                    data,
                    compare_mask: (2 * initial_capacity) - 1,
                    read_mask: initial_capacity - 1,
                }),
                read_head: AtomicU32::new(0),
                write_head: AtomicU32::new(initial_capacity),
                limit_head: AtomicU32::new(initial_capacity),
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
            }),
        };
    }

    #[inline]
    pub fn producer(&self) -> QueueDynProducer<T> {
        return QueueDynProducer::new(self);
    }

    /// Perform an attempted read from the queue. It might be that some producer
    /// is putting something in the queue while this function is executing, and
    /// we don't get the consume it.
    pub fn pop(&mut self) -> Option<T> {
        let data_lock = self.inner.data.lock_shared();
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
        let buf_size = data_lock.data.cap();

        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
            // Make a bitwise copy of the value and return it. The receiver is
            // responsible for dropping it.
            unsafe {
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
                // We can perform a store since we're the only ones modifying
                // the atomic.
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
                return Some(std::ptr::read(source));
            }
        } else {
            return None;
        }
    }

    #[inline]
    fn assert_correct_size(capacity: usize) {
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
    }
}

impl<T> Drop for QueueDynMpsc<T> {
    fn drop(&mut self) {
        // There should be no more `QueueDynProducer` pointers to this queue
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
        // And so the limit head should be equal to the write head
        let data_lock = self.inner.data.lock_shared();
        let write_index = self.inner.write_head.load(Ordering::Acquire);
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);

        // Every item that has not yet been taken out of the queue needs to
        // have its destructor called. We immediately apply the
        // increment-by-size trick and wait until we've hit the write head.
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
        read_index += data_lock.data.cap();
        while read_index & data_lock.compare_mask != write_index {
            unsafe {
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
                std::ptr::drop_in_place(target);
            }
            read_index += 1;
        }
    }
}

pub struct QueueDynProducer<T> {
    queue: *const Shared<T>,
}

impl<T> QueueDynProducer<T> {
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
        unsafe {
            // If you only knew the power of the dark side! Obi-Wan never told
            // you what happened to your father!
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
            return Self{ queue };
        }
    }

    pub fn push(&self, value: T) {
        let queue = unsafe{ &*self.queue };

        let data_lock = queue.data.lock_shared();
        let read_index = queue.read_head.load(Ordering::Acquire);
        let write_index = queue.write_head.load(Ordering::Acquire);
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
            let expected_capacity = data_lock.data.cap();
        }
    }

    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
        drop(shared_lock);
        let queue = unsafe{ &*self.queue };

        {
            let exclusive_lock = self.queue.data.lock_exclusive();

            // We hold the exclusive lock, but someone else might have done the resizing, and so:
            if exclusive_lock.data.cap() == expected_capacity {
                let old_capacity = expected_capacity;
                let new_capacity = 2 * old_capacity;

                // Resize by a factor of two, and make the two halves identical.
                exclusive_lock.data.resize(new_capacity);
                for idx in old_capacity..new_capacity {
                    unsafe {
                        let target = exclusive_lock.data.get(idx);
                        let source = exclusive_lock.data.get(idx - old_capacity);
                        std::ptr::write(target, std::ptr::read(source));
                    }
                }

                // Modify all atomics to reflect that we just resized the
                // underlying buffer.
                
            }
        }

        // Reacquire shared lock
        return queue.data.lock_shared();
    }
}

impl<T> Drop for QueueDynProducer<T> {
    fn drop(&mut self) {
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
    }
}

// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}