Files
@ 75e767adaaf7
Branch filter:
Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs - annotation
75e767adaaf7
4.9 KiB
application/rls-services+xml
Add a new RawArray implementation
Needed to implement ringbuffer queues in a neat way. Will deprecate the
old RawVec implementation
Needed to implement ringbuffer queues in a neat way. Will deprecate the
old RawVec implementation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 | use std::sync::atomic::{AtomicU32, Ordering};
use super::unfair_se_lock::{UnfairSeLock};
/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T: 'static> {
// Entire contents are boxed up such that we can create producers that have
// a pointer to the contents.
inner: Box<Shared<T>>
}
// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}
/// Shared data between queue consumer and the queue producers
struct Shared<T: 'static> {
data: UnfairSeLock<Inner<T>>,
read_head: u32,
write_head: AtomicU32,
limit_head: AtomicU32,
#[cfg(debug_assertions)] dbg: AtomicU32,
}
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
data: Vec<T>,
compare_mask: u32,
read_mask: u32,
}
impl<T> QueueDynMpsc<T> {
/// Constructs a new MPSC queue. Note that the initial capacity is always
/// increased to the next power of 2 (if it isn't already).
pub fn new(initial_capacity: usize) -> Self {
let initial_capacity = initial_capacity.next_power_of_two();
Self::assert_correct_size(initial_capacity);
return Self{
inner: Box::new(Shared {
data: UnfairSeLock::new(Inner{
data: Vec::with_capacity(initial_capacity),
compare_mask: (2 * initial_capacity as u32) - 1,
read_mask: initial_capacity as u32 - 1,
}),
read_head: 0,
write_head: AtomicU32::new(0),
limit_head: AtomicU32::new(0),
#[cfg(debug_assertions)] dbg: AtomicU32::new(0),
}),
};
}
#[inline]
pub fn producer(&self) -> QueueDynProducer<T> {
return QueueDynProducer::new(self);
}
/// Perform an attempted read from the queue. It might be that some producer
/// is putting something in the queue while this function is executing, and
/// we don't get the consume it.
pub fn read(&mut self) -> Option<T> {
let cur_read = self.inner.read_head;
let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
let data_lock = self.inner.data.lock_shared();
if cur_read != cur_limit {
// Make a bitwise copy of the value and return it. The receiver is
// responsible for dropping it.
unsafe {
let source = data_lock.data.as_ptr().add((cur_read & data_lock.read_mask) as usize);
self.inner.read_head += 1;
return Some(std::ptr::read(source));
}
} else {
return None;
}
}
#[inline]
fn assert_correct_size(capacity: usize) {
assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
}
}
impl<T> Drop for QueueDynMpsc<T> {
fn drop(&mut self) {
// There should be no more `QueueDynProducer` pointers to this queue
dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
// And so the limit head should be equal to the write head
let write_index = self.inner.write_head.load(Ordering::Acquire);
assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
// Every item that has not yet been taken out of the queue needs to
// have
while self.inner.read_head != write_index {
}
}
}
pub struct QueueDynProducer<T: 'static> {
queue: &'static Shared<T>,
}
impl<T> QueueDynProducer<T> {
fn new(consumer: &QueueDynMpsc<T>) -> Self {
dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
unsafe {
// If you only knew the power of the dark side! Obi-Wan never told
// you what happened to your father!
let queue: &'static _ = std::mem::transmute(consumer.inner.as_ref());
return Self{ queue };
}
}
}
impl<T> Drop for QueueDynProducer<T> {
fn drop(&mut self) {
dbg_code!(self.queue.dbg.fetch_sub(1, Ordering::AcqRel));
}
}
// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}
|