Files @ fafdf8723ee3
Branch filter:

Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs - annotation

fafdf8723ee3 7.6 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
WIP: Busy with MPSC channel

Will act as backing buffer for messages (for now).
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
fafdf8723ee3
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
75e767adaaf7
use std::sync::atomic::{AtomicU32, Ordering};

use crate::collections::RawArray;
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};

/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T> {
    // Entire contents are boxed up such that we can create producers that have
    // a pointer to the contents.
    inner: Box<Shared<T>>
}

// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}

/// Shared data between queue consumer and the queue producers
struct Shared<T> {
    data: UnfairSeLock<Inner<T>>,
    read_head: AtomicU32,
    write_head: AtomicU32,
    limit_head: AtomicU32,
    #[cfg(debug_assertions)] dbg: AtomicU32,
}

/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
    data: RawArray<T>,
    compare_mask: u32,
    read_mask: u32,
}

type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;

impl<T> QueueDynMpsc<T> {
    /// Constructs a new MPSC queue. Note that the initial capacity is always
    /// increased to the next power of 2 (if it isn't already).
    pub fn new(initial_capacity: usize) -> Self {
        let initial_capacity = initial_capacity.next_power_of_two();
        Self::assert_correct_size(initial_capacity);

        let mut data = RawArray::new();
        data.resize(initial_capacity);

        let initial_capacity = initial_capacity as u32;

        return Self{
            inner: Box::new(Shared {
                data: UnfairSeLock::new(Inner{
                    data,
                    compare_mask: (2 * initial_capacity) - 1,
                    read_mask: initial_capacity - 1,
                }),
                read_head: AtomicU32::new(0),
                write_head: AtomicU32::new(initial_capacity),
                limit_head: AtomicU32::new(initial_capacity),
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
            }),
        };
    }

    #[inline]
    pub fn producer(&self) -> QueueDynProducer<T> {
        return QueueDynProducer::new(self);
    }

    /// Perform an attempted read from the queue. It might be that some producer
    /// is putting something in the queue while this function is executing, and
    /// we don't get the consume it.
    pub fn pop(&mut self) -> Option<T> {
        let data_lock = self.inner.data.lock_shared();
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
        let buf_size = data_lock.data.cap();

        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
            // Make a bitwise copy of the value and return it. The receiver is
            // responsible for dropping it.
            unsafe {
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
                // We can perform a store since we're the only ones modifying
                // the atomic.
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
                return Some(std::ptr::read(source));
            }
        } else {
            return None;
        }
    }

    #[inline]
    fn assert_correct_size(capacity: usize) {
        assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
    }
}

impl<T> Drop for QueueDynMpsc<T> {
    fn drop(&mut self) {
        // There should be no more `QueueDynProducer` pointers to this queue
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
        // And so the limit head should be equal to the write head
        let data_lock = self.inner.data.lock_shared();
        let write_index = self.inner.write_head.load(Ordering::Acquire);
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);

        // Every item that has not yet been taken out of the queue needs to
        // have its destructor called. We immediately apply the
        // increment-by-size trick and wait until we've hit the write head.
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
        read_index += data_lock.data.cap();
        while read_index & data_lock.compare_mask != write_index {
            unsafe {
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
                std::ptr::drop_in_place(target);
            }
            read_index += 1;
        }
    }
}

pub struct QueueDynProducer<T> {
    queue: *const Shared<T>,
}

impl<T> QueueDynProducer<T> {
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
        unsafe {
            // If you only knew the power of the dark side! Obi-Wan never told
            // you what happened to your father!
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
            return Self{ queue };
        }
    }

    pub fn push(&self, value: T) {
        let queue = unsafe{ &*self.queue };

        let data_lock = queue.data.lock_shared();
        let read_index = queue.read_head.load(Ordering::Acquire);
        let write_index = queue.write_head.load(Ordering::Acquire);
        if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
            let expected_capacity = data_lock.data.cap();
        }
    }

    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
        drop(shared_lock);
        let queue = unsafe{ &*self.queue };

        {
            let exclusive_lock = self.queue.data.lock_exclusive();

            // We hold the exclusive lock, but someone else might have done the resizing, and so:
            if exclusive_lock.data.cap() == expected_capacity {
                let old_capacity = expected_capacity;
                let new_capacity = 2 * old_capacity;

                // Resize by a factor of two, and make the two halves identical.
                exclusive_lock.data.resize(new_capacity);
                for idx in old_capacity..new_capacity {
                    unsafe {
                        let target = exclusive_lock.data.get(idx);
                        let source = exclusive_lock.data.get(idx - old_capacity);
                        std::ptr::write(target, std::ptr::read(source));
                    }
                }

                // Modify all atomics to reflect that we just resized the
                // underlying buffer.
                
            }
        }

        // Reacquire shared lock
        return queue.data.lock_shared();
    }
}

impl<T> Drop for QueueDynProducer<T> {
    fn drop(&mut self) {
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
    }
}

// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}