Files
@ fafdf8723ee3
Branch filter:
Location: CSY/reowolf/src/runtime2/store/queue_mpsc.rs - annotation
fafdf8723ee3
7.6 KiB
application/rls-services+xml
WIP: Busy with MPSC channel
Will act as backing buffer for messages (for now).
Will act as backing buffer for messages (for now).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 fafdf8723ee3 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 75e767adaaf7 | use std::sync::atomic::{AtomicU32, Ordering};
use crate::collections::RawArray;
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
/// Multiple-producer single-consumer queue. Generally used in the publicly
/// accessible fields of a component. The holder of this struct should be the
/// consumer. To retrieve access to the producer-side: call `producer()`.
///
/// This is a queue that will resize (indefinitely) if it becomes full, and will
/// not shrink. So probably a temporary thing.
///
/// In debug mode we'll make sure that there are no producers when the queue is
/// dropped. We don't do this in release mode because the runtime is written
/// such that components always remain alive (hence, this queue will remain
/// accessible) while there are references to it.
pub struct QueueDynMpsc<T> {
// Entire contents are boxed up such that we can create producers that have
// a pointer to the contents.
inner: Box<Shared<T>>
}
// One may move around the queue between threads, as long as there is only one
// instance of it.
unsafe impl<T> Send for QueueDynMpsc<T>{}
/// Shared data between queue consumer and the queue producers
struct Shared<T> {
data: UnfairSeLock<Inner<T>>,
read_head: AtomicU32,
write_head: AtomicU32,
limit_head: AtomicU32,
#[cfg(debug_assertions)] dbg: AtomicU32,
}
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
/// inner data array is resized.
struct Inner<T> {
data: RawArray<T>,
compare_mask: u32,
read_mask: u32,
}
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
impl<T> QueueDynMpsc<T> {
/// Constructs a new MPSC queue. Note that the initial capacity is always
/// increased to the next power of 2 (if it isn't already).
pub fn new(initial_capacity: usize) -> Self {
let initial_capacity = initial_capacity.next_power_of_two();
Self::assert_correct_size(initial_capacity);
let mut data = RawArray::new();
data.resize(initial_capacity);
let initial_capacity = initial_capacity as u32;
return Self{
inner: Box::new(Shared {
data: UnfairSeLock::new(Inner{
data,
compare_mask: (2 * initial_capacity) - 1,
read_mask: initial_capacity - 1,
}),
read_head: AtomicU32::new(0),
write_head: AtomicU32::new(initial_capacity),
limit_head: AtomicU32::new(initial_capacity),
#[cfg(debug_assertions)] dbg: AtomicU32::new(0),
}),
};
}
#[inline]
pub fn producer(&self) -> QueueDynProducer<T> {
return QueueDynProducer::new(self);
}
/// Perform an attempted read from the queue. It might be that some producer
/// is putting something in the queue while this function is executing, and
/// we don't get the consume it.
pub fn pop(&mut self) -> Option<T> {
let data_lock = self.inner.data.lock_shared();
let cur_read = self.inner.read_head.load(Ordering::Acquire);
let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
let buf_size = data_lock.data.cap();
if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
// Make a bitwise copy of the value and return it. The receiver is
// responsible for dropping it.
unsafe {
let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
// We can perform a store since we're the only ones modifying
// the atomic.
self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
return Some(std::ptr::read(source));
}
} else {
return None;
}
}
#[inline]
fn assert_correct_size(capacity: usize) {
assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
}
}
impl<T> Drop for QueueDynMpsc<T> {
fn drop(&mut self) {
// There should be no more `QueueDynProducer` pointers to this queue
dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
// And so the limit head should be equal to the write head
let data_lock = self.inner.data.lock_shared();
let write_index = self.inner.write_head.load(Ordering::Acquire);
assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
// Every item that has not yet been taken out of the queue needs to
// have its destructor called. We immediately apply the
// increment-by-size trick and wait until we've hit the write head.
let mut read_index = self.inner.read_head.load(Ordering::Acquire);
read_index += data_lock.data.cap();
while read_index & data_lock.compare_mask != write_index {
unsafe {
let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
std::ptr::drop_in_place(target);
}
read_index += 1;
}
}
}
pub struct QueueDynProducer<T> {
queue: *const Shared<T>,
}
impl<T> QueueDynProducer<T> {
fn new(consumer: &QueueDynMpsc<T>) -> Self {
dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
unsafe {
// If you only knew the power of the dark side! Obi-Wan never told
// you what happened to your father!
let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
return Self{ queue };
}
}
pub fn push(&self, value: T) {
let queue = unsafe{ &*self.queue };
let data_lock = queue.data.lock_shared();
let read_index = queue.read_head.load(Ordering::Acquire);
let write_index = queue.write_head.load(Ordering::Acquire);
if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
let expected_capacity = data_lock.data.cap();
}
}
fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
drop(shared_lock);
let queue = unsafe{ &*self.queue };
{
let exclusive_lock = self.queue.data.lock_exclusive();
// We hold the exclusive lock, but someone else might have done the resizing, and so:
if exclusive_lock.data.cap() == expected_capacity {
let old_capacity = expected_capacity;
let new_capacity = 2 * old_capacity;
// Resize by a factor of two, and make the two halves identical.
exclusive_lock.data.resize(new_capacity);
for idx in old_capacity..new_capacity {
unsafe {
let target = exclusive_lock.data.get(idx);
let source = exclusive_lock.data.get(idx - old_capacity);
std::ptr::write(target, std::ptr::read(source));
}
}
// Modify all atomics to reflect that we just resized the
// underlying buffer.
}
}
// Reacquire shared lock
return queue.data.lock_shared();
}
}
impl<T> Drop for QueueDynProducer<T> {
fn drop(&mut self) {
dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
}
}
// producer end is `Send`, because in debug mode we make sure that there are no
// more producers when the queue is destroyed. But is not sync, because that
// would circumvent our atomic counter shenanigans.
unsafe impl<T> Send for QueueDynProducer<T>{}
|