From 0e1a766679374661a3cf1b2782a7806767623445 2022-01-05 12:11:03 From: MH Date: 2022-01-05 12:11:03 Subject: [PATCH] Started work on speculationless runtime --- diff --git a/Cargo.toml b/Cargo.toml index 0b87c1b7d064cbab7cbaa1a5dfba4f21f38b11d4..5063ffecb153a716769bc356287a37871a7598fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,9 +33,8 @@ libc = { version = "^0.2", optional = true } os_socketaddr = { version = "0.1.0", optional = true } [dev-dependencies] -# test-generator = "0.3.0" -crossbeam-utils = "0.7.2" -lazy_static = "1.4.0" +rand = "0.8.4" +rand_pcg = "0.3.1" [lib] crate-type = [ diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index feab9c135698e49f53dba5985a928d1644f48478..00e18b174b087d514ebe4b29e8760a104de3c0bc 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -196,7 +196,7 @@ impl Frame { } } -type EvalResult = Result; +pub type EvalResult = Result; #[derive(Debug)] pub enum EvalContinuation { diff --git a/src/protocol/eval/mod.rs b/src/protocol/eval/mod.rs index a1bc818d7e5d762b872506922c8f7e2bdcdf3273..0425444e482048420bdc13dd7449620babb0474e 100644 --- a/src/protocol/eval/mod.rs +++ b/src/protocol/eval/mod.rs @@ -27,5 +27,5 @@ pub(crate) mod error; pub use error::EvalError; pub use value::{PortId, Value, ValueGroup}; -pub use executor::{EvalContinuation, Prompt}; +pub use executor::{EvalContinuation, EvalResult, Prompt}; diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..f3397eeba71f6fff803b9329a782911f1e2652ee 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -0,0 +1,38 @@ +use super::runtime::*; + +#[derive(Copy, Clone)] +pub struct PortId(pub u32); + +impl PortId { + pub fn new_invalid() -> Self { + return Self(u32::MAX); + } +} + +pub struct Peer { + pub id: CompId, + pub(crate) handle: CompHandle, +} + +pub enum PortKind { + Putter, + Getter, +} + +pub enum PortState { + Open, + Closed, +} + +pub struct Port { + pub self_id: PortId, + pub peer_id: PortId, + pub kind: PortKind, + pub state: PortState, + pub local_peer_index: u32, +} + +/// Public inbox: accessible by all threads. Essentially a MPSC channel +pub struct InboxPublic { + +} \ No newline at end of file diff --git a/src/runtime2/component.rs b/src/runtime2/component.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..654c6ce2d4b6af1b9d4de3130ee304b4e0620985 100644 --- a/src/runtime2/component.rs +++ b/src/runtime2/component.rs @@ -0,0 +1,216 @@ +use crate::protocol::*; +use crate::protocol::eval::{ + PortId as EvalPortId, Prompt, + ValueGroup, Value, + EvalContinuation, EvalResult, EvalError +}; + +use super::runtime::*; +use super::scheduler::SchedulerCtx; +use super::communication::*; + +pub enum CompScheduling { + Immediate, + Requeue, + Sleep, + Exit, +} + +pub struct CompCtx { + pub id: CompId, + pub ports: Vec, + pub peers: Vec, + pub messages: Vec, // same size as "ports" +} + +impl CompCtx { + fn take_message(&mut self, port_id: PortId) -> Option { + let old_value = &mut self.messages[port_id.0 as usize]; + if old_value.values.is_empty() { + return None; + } + + // Replace value in array with an empty one + let mut message = ValueGroup::new_stack(Vec::new()); + std::mem::swap(old_value, &mut message); + return Some(message); + } + + fn find_peer(&self, port_id: PortId) -> &Peer { + let port_info = &self.ports[port_id.0 as usize]; + let peer_info = &self.peers[port_info.local_peer_index as usize]; + return peer_info; + } +} + +pub enum ExecStmt { + CreatedChannel((Value, Value)), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl ExecStmt { + fn take(&mut self) -> ExecStmt { + let mut value = ExecStmt::None; + std::mem::swap(self, &mut value); + return value; + } + + fn is_none(&self) -> bool { + match self { + ExecStmt::None => return true, + _ => return false, + } + } +} + +pub struct ExecCtx { + stmt: ExecStmt, +} + +impl RunContext for ExecCtx { + fn performed_put(&mut self, _port: EvalPortId) -> bool { + match self.stmt.take() { + ExecStmt::None => return false, + ExecStmt::PerformedPut => return true, + _ => unreachable!(), + } + } + + fn performed_get(&mut self, _port: EvalPortId) -> Option { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::PerformedGet(value) => return Some(value), + _ => unreachable!(), + } + } + + fn fires(&mut self, _port: EvalPortId) -> Option { + todo!("remove fires") + } + + fn performed_fork(&mut self) -> Option { + todo!("remove fork") + } + + fn created_channel(&mut self) -> Option<(Value, Value)> { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::CreatedChannel(ports) => return Some(ports), + _ => unreachable!(), + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum Mode { + NonSync, + Sync, + BlockedGet, + BlockedPut, +} + +pub(crate) struct CompPDL { + pub mode: Mode, + pub mode_port: PortId, // when blocked on a port + pub mode_value: ValueGroup, // when blocked on a put + pub prompt: Prompt, + pub exec_ctx: ExecCtx, +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt) -> Self { + return Self{ + mode: Mode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + prompt: initial_state, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + } + } + } + + pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + use EvalContinuation as EC; + + let run_result = self.execute_prompt(&sched_ctx)?; + + match run_result { + EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned + EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), + // Results that can be returned in sync mode + EC::SyncBlockEnd => { + debug_assert_eq!(self.mode, Mode::Sync); + self.handle_sync_end(sched_ctx, comp_ctx); + }, + EC::BlockGet(port_id) => { + debug_assert_eq!(self.mode, Mode::Sync); + + let port_id = transform_port_id(port_id); + if let Some(message) = comp_ctx.take_message(port_id) { + // We can immediately receive and continue + debug_assert!(self.exec_ctx.stmt.is_none()); + self.exec_ctx.stmt = ExecStmt::PerformedGet(message); + return Ok(CompScheduling::Immediate); + } else { + // We need to wait + self.mode = Mode::BlockedGet; + self.mode_port = port_id; + return Ok(CompScheduling::Sleep); + } + }, + EC::Put(port_id, value) => { + debug_assert_eq!(self.mode, Mode::Sync); + + let port_id = transform_port_id(port_id); + let peer = comp_ctx.find_peer(port_id); + }, + // Results that can be returned outside of sync mode + EC::ComponentTerminated => { + debug_assert_eq!(self.mode, Mode::NonSync); + + }, + EC::SyncBlockStart => { + debug_assert_eq!(self.mode, Mode::NonSync); + self.handle_sync_start(sched_ctx, comp_ctx); + }, + EC::NewComponent(definition_id, monomorph_idx, arguments) => { + debug_assert_eq!(self.mode, Mode::NonSync); + + }, + EC::NewChannel => { + debug_assert_eq!(self.mode, Mode::NonSync); + + } + } + + return Ok(CompScheduling::Sleep); + } + + fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { + let mut step_result = EvalContinuation::Stepping; + while let EvalContinuation::Stepping = step_result { + step_result = self.prompt.step( + &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, + &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, + )?; + } + + return Ok(step_result) + } + + fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } + + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } +} + +#[inline] +fn transform_port_id(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 29342214db784707f350f2fc57a43d559c71cf80..880f89fa5743b93ed09697b852bbc932d88652ac 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,4 +1,5 @@ - +mod store; mod runtime; mod component; -mod communication; \ No newline at end of file +mod communication; +mod scheduler; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 767088eb2a1eed792272aab2326cf3e5881f29c4..0d8f13909ebaee5567dfdb99a6022e5d3bc58ba0 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,10 +1,12 @@ -use std::mem::{size_of, align_of, transmute}; -use std::alloc::{alloc, dealloc, Layout}; -use std::sync::Arc; -use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Condvar}; +use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::collections::VecDeque; use crate::protocol::*; +use super::component::{CompCtx, CompPDL}; +use super::store::ComponentStore; + // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- @@ -12,37 +14,77 @@ use crate::protocol::*; /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. -#[derive(Copy, Clone)] -pub(crate) struct CompKey(CompId); +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) struct CompKey(u32); -/// Generational ID of a component -#[derive(Copy, Clone)] -pub(crate) struct CompId { - pub index: u32, - pub generation: u32, +impl CompKey { + pub(crate) fn downgrade(&self) -> CompId { + return CompId(self.0); + } } -impl PartialEq for CompId { - fn eq(&self, other: &Self) -> bool { - return self.index.eq(&other.index); +/// Generational ID of a component +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct CompId(u32); + +impl CompId { + pub(crate) fn new_invalid() -> CompId { + return CompId(u32::MAX); + } + + /// Upgrade component ID to component key. Unsafe because the caller needs + /// to make sure that only one component key can exist at a time (to ensure + /// a component can only be scheduled/executed by one thread). + pub(crate) unsafe fn upgrade(&self) -> CompKey { + return CompKey(self.0); } } -impl Eq for CompId {} /// In-runtime storage of a component -pub(crate) struct RtComp { +pub(crate) struct RuntimeComp { + pub public: CompPublic, + pub private: CompPrivate, +} + +/// Should contain everything that is accessible in a thread-safe manner +pub(crate) struct CompPublic { + pub sleeping: AtomicBool, + pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances +} +/// Handle to public part of a component. +pub(crate) struct CompHandle { + target: *const CompPublic, +} + +impl std::ops::Deref for CompHandle { + type Target = CompPublic; + + fn deref(&self) -> &Self::Target { + return unsafe{ &*self.target }; + } +} + +/// May contain non thread-safe fields. Accessed only by the scheduler which +/// will temporarily "own" the component. +pub(crate) struct CompPrivate { + pub code: CompPDL, + pub ctx: CompCtx, } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- -type RuntimeHandle = Arc; +pub type RuntimeHandle = Arc; /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { + pub protocol: ProtocolDescription, + components: ComponentStore, + work_queue: Mutex>, + work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } @@ -50,253 +92,71 @@ impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); return Runtime{ + protocol: protocol_description, + components: ComponentStore::new(128), + work_queue: Mutex::new(VecDeque::with_capacity(128)), + work_condvar: Condvar::new(), active_elements: AtomicU32::new(0), }; } -} - -// ----------------------------------------------------------------------------- -// Runtime containers -// ----------------------------------------------------------------------------- - -/// Component storage. Note that it shouldn't be polymorphic, but making it so -/// allows us to test it more easily. The container is essentially a -/// thread-safe freelist. The list always contains *all* free entries in the -/// storage array. -/// -/// The freelist itself is implemented using a thread-safe ringbuffer. But there -/// are some very important properties we exploit in this specific -/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e. -/// adding to the freelist) corresponds to destroying a component, and reading -/// from the ringbuffer corresponds to creating a component. The aforementioned -/// properties are: one can never write more to the ringbuffer than has been -/// read from it (i.e. destroying more components than are created), we may -/// safely assume that when the `CompStore` is dropped that no thread can access -/// it (because they've all been shut down). This simplifies deallocation code. -/// -/// Internally each individual instance of `T` will be (de)allocated. So we will -/// not store an array of `T`, but an array of `*T`. This keeps the storage of -/// `T` pointer-stable (as is required for the schedulers actually running the -/// components, because they'll fetch a component and then continue running it -/// while this component storage might get reallocated). -/// -/// Note that there is still some unsafety here that is kept in check by the -/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that -/// only one mutable reference will ever be obtained, and potentially multiple -/// immutable references. But in practice the `&mut T` will be used to access -/// so-called "public" fields immutably, and "private" fields mutable. While the -/// `&T` will only be used to access the "public" fields immutably. -struct CompStore { - freelist: *mut u32, - data: *mut *mut T, - count: usize, - mask: usize, - byte_size: usize, // used for dealloc - write_head: AtomicUsize, - limit_head: AtomicUsize, - read_head: AtomicUsize, -} - -const fn compute_realloc_flag() -> usize { - match size_of::() { - 4 => return 1 << 31, // 32-bit system - 8 => return 1 << 63, // 64-bit system - _ => panic!("unexpected byte size for 'usize'") - } -} - -impl CompStore { - const REALLOC_FLAG: usize = compute_realloc_flag(); - - fn new(initial_count: usize) -> Self { - // Allocate data - debug_assert!(size_of::() > 0); // No ZST during testing (and definitely not in production) - let (freelist, data, byte_size) = Self::alloc_buffer(initial_count); - unsafe { - // Init the freelist to all of the indices in the array of data - let mut target = freelist; - for idx in 0..initial_count as u32 { - *target = idx; - target += 1; - } + // Scheduling and retrieving work - // And init the data such that they're all NULL pointers - std::ptr::write_bytes(data, 0, initial_count); + pub(crate) fn take_work(&self) -> Option { + let mut lock = self.work_queue.lock().unwrap(); + while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { + lock = self.work_condvar.wait(lock).unwrap(); } - return CompStore{ - freelist, data, - count: initial_count, - mask: initial_count - 1, - byte_size, - write_head: AtomicUsize::new(initial_count), - limit_head: AtomicUsize::new(initial_count), - read_head: AtomicUsize::new(0), - }; + return lock.pop_front(); } - fn get_index_from_freelist(&self) -> u32 { - let compare_mask = (self.count * 2) - 1; - let mut read_index = self.read_head.load(Ordering::Acquire); // read index first - - 'try_loop: loop { - let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second - - // By definition we always have `read_index <= limit_index` (if we would - // have an infinite buffer, in reality we will wrap). - if (read_index & compare_mask) == (limit_index & compare_mask) { - // We need to create a bigger buffer. Note that no reader can - // *ever* set the read index to beyond the limit index, and it - // is currently equal. So we're certain that there is no other - // reader currently updating the read_head. - // - // To test if we are supposed to resize the backing buffer we - // try set the REALLOC_FLAG on the limit index. Note that the - // stored indices are always in the range [0, 2*count). So if - // we add REALLOC_FLAG to the limit index, then the masked - // condition above still holds! Other potential readers will end - // up here and are allowed to wait until we resized the backing - // container. - // - // Furthermore, setting the limit index to this high value also - // notifies the writer that any of it writes should be tried - // again, as they're writing to a buffer that is going to get - // trashed. - todo!("finish reallocation code"); - match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) { - Ok(_) => { - // Limit index has changed, so we're now the ones that - // are supposed to resize the - } - } - } else { - // It seems we have space to read - let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) }; - if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - // Failed to do the CAS, try again. We need to start at the - // start again because we might have had other readers that - // were successful, so at the very least, the preemptive - // read we did is no longer correct. - read_index = new_read_index; - continue 'try_loop; - } - - // We now "own" the value at the read index - return preemptive_read; - } - } + pub(crate) fn enqueue_work(&self, key: CompKey) { + let mut lock = self.work_queue.lock().unwrap(); + lock.push_back(key); + self.work_condvar.notify_one(); } - fn put_back_index_into_freelist(&self, index: u32) { - let mut compare_mask = (self.count * 2) - 1; - let mut write_index = self.write_head.load(Ordering::Acquire); - while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - // Failed to do the CAS, try again - write_index = new_write_index; - } - - 'try_write_loop: loop { - // We are now the only ones that can write at `write_index`. Try to - // do so - unsafe { *self.freelist.add(write_index & self.mask) = index; } - - // But we still need to move the limit head. Only succesful writers - // may move it so we expect it to move from the `write_index` to - // `write_index + 1`, but we might have to spin to achieve it. - // Furthermore, the `limit_head` is used by the index-retrieval - // function to indicate that a read is in progress. - 'commit_to_write_loop: loop { - match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - Ok(_) => break, - Err(new_value) => { - // Two options: the limit is not yet what we expect it - // to be. If so, just try again with the old values. - // But if it is very large (relatively) then this is the - // signal from the reader that the entire storage is - // being resized - if new_value & Self::REALLOC_FLAG != 0 { - // Someone is resizing, wait until that is no longer - // true. - while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 { - // still resizing - } - - // Not resizing anymore, try everything again, our - // old write has now become invalid. But our index - // hasn't! So we need to finish our write and our - // increment of the limit head - continue 'try_write_loop; - } else { - // Just try again - continue 'commit_to_write_loop; - } - } + // Creating/destroying components + + pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey { + let comp = RuntimeComp{ + public: CompPublic{ + sleeping: AtomicBool::new(initially_sleeping), + num_handles: AtomicU32::new(1), // the component itself acts like a handle + }, + private: CompPrivate{ + code: comp, + ctx: CompCtx{ + id: CompId(0), + ports: Vec::new(), + peers: Vec::new(), + messages: Vec::new(), } } + }; - // We updated the limit head, so we're done :) - return; - } - } - - /// Retrieves a `&T` from the store. This should be retrieved using `create` - /// and not yet given back by calling `destroy`. - fn get(&self, index: u32) -> &T { + let index = self.components.create(comp); - } - - /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you - /// know what you're doing :) - fn get_mut(&self, index: u32) -> &mut T { + // TODO: just do a reserve_index followed by a consume_index or something + self.components.get_mut(index).private.ctx.id = CompId(index); + return CompKey(index); } - fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) { - // Probably overkill considering the amount of memory that is needed to - // exceed this number. But still: ensure `num` adheres to the - // requirements needed for correct functioning of the store. - assert!( - num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(), - "invalid allocation count for CompStore buffer" - ); - - // Compute byte size of freelist (so we assume alignment of `u32`) - let mut byte_size = num * size_of::(); - - // Align to `*mut T`, then reserve space for all of the pointers - byte_size = Self::align_to(byte_size, align_of::<*mut T>()); - let byte_offset_data = byte_size; - byte_size += num * size_of::; - - unsafe { - // Allocate, then retrieve pointers to allocated regions - let layout = Self::layout_for(byte_size); - let memory = alloc(layout); - let base_free: *mut u32 = transmute(memory); - let base_data: *mut *mut T = transmute(memory.add(byte_offset_data)); - - return (base_free, base_data, byte_size); - } + pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { + let component = self.components.get_mut(key.0); + return component; } - fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) { - // Note: we only did one allocation, freelist is at the front - let layout = Self::layout_for(byte_size); - unsafe { - let base: *mut u8 = transmute(freelist); - dealloc(base, layout); - } + pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic { + let component = self.components.get(id.0); + return &component.public; } - fn layout_for(byte_size: usize) -> Layout { - debug_assert!(byte_size % size_of::() == 0); - return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::()) }; + pub(crate) fn destroy_component(&self, key: CompKey) { + self.components.destroy(key.0); } - fn align_to(offset: usize, alignment: usize) -> usize { - debug_assert!(alignment.is_power_of_two()); - let mask = alignment - 1; - return (offset + mask) & !mask; - } -} \ No newline at end of file + // Interacting with components +} diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs new file mode 100644 index 0000000000000000000000000000000000000000..047cd25f8717f6f22e55404210a125a5d2ad56ef --- /dev/null +++ b/src/runtime2/scheduler.rs @@ -0,0 +1,67 @@ +use std::sync::atomic::Ordering; + +use super::component::*; +use super::runtime::*; + +/// Data associated with a scheduler thread +pub(crate) struct Scheduler { + runtime: RuntimeHandle, + scheduler_id: u32, +} + +pub(crate) struct SchedulerCtx<'a> { + pub runtime: &'a Runtime, +} + +impl Scheduler { + // public interface to thread + + pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self { + return Scheduler{ runtime, scheduler_id } + } + + pub fn run(&mut self) { + let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; + + 'run_loop: loop { + // Wait until we have something to do (or need to quit) + let comp_key = self.runtime.take_work(); + if comp_key.is_none() { + break 'run_loop; + } + + let comp_key = comp_key.unwrap(); + let comp_id = comp_key.downgrade(); + let component = self.runtime.get_component(comp_key); + + // Run the component until it no longer indicates that it needs to + // be re-executed immediately. + let mut new_scheduling = CompScheduling::Immediate; + while let CompScheduling::Immediate = new_scheduling { + new_scheduling = component.private.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); + } + + // Handle the new scheduling + match new_scheduling { + CompScheduling::Immediate => unreachable!(), + CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, + CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, + CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); } + } + } + } + + // local utilities + + fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) { + debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key + debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping + + component.public.sleeping.store(true, Ordering::Release); + todo!("check for messages"); + } + + fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) { + todo!("do something") + } +} \ No newline at end of file diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs new file mode 100644 index 0000000000000000000000000000000000000000..12c56123b5ca25e899ce0ddd1ad8bb72747f72de --- /dev/null +++ b/src/runtime2/store/component.rs @@ -0,0 +1,559 @@ +/* + * Component Store + * + * Concurrent datastructure for creating/destroying/retrieving components using + * their ID. It is essentially a variation on a concurrent freelist. We store an + * array of (potentially null) pointers to data. Indices into this array that + * are unused (but may be left allocated) are in a freelist. So creating a new + * bit of data involves taking an index from this freelist. Destruction involves + * putting the index back. + * + * This datastructure takes care of the threadsafe implementation of the + * freelist and calling the data's destructor when needed. Note that it is not + * completely safe (in Rust's sense of the word) because it is possible to + * get more than one mutable reference to a piece of data. Likewise it is + * possible to put back bogus indices into the freelist, which will destroy the + * integrity of the datastructure. + * + * Some underlying assumptions that led to this design (note that I haven't + * actually checked these conditions or performed any real profiling, yet): + * - Resizing the freelist should be very rare. The datastructure should grow + * to some kind of maximum size and stay at that size. + * - Creation should (preferably) be faster than deletion of data. Reason being + * that creation implies we're creating a component that has code to be + * executed. Better to quickly be able to execute code than being able to + * quickly tear down finished components. + * - Retrieval is much more likely than creation/destruction. + * + * Some obvious flaws with this implementation: + * - Because of the freelist implementation we will generally allocate all of + * the data pointers that are available (i.e. if we have a buffer of size + * 64, but we generally use 33 elements, than we'll have 64 elements + * allocated), which might be wasteful at larger array sizes (which are + * always powers of two). + * - A lot of concurrent operations are not necessary: we may move some of the + * access to the global concurrent datastructure by an initial access to some + * kind of thread-local datastructure. + */ + +use std::mem::transmute; +use std::alloc::{alloc, dealloc, Layout}; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; + +pub struct ComponentStore { + inner: UnfairSeLock>, + read_head: AtomicUsize, + write_head: AtomicUsize, + limit_head: AtomicUsize, +} + +unsafe impl Send for ComponentStore{} +unsafe impl Sync for ComponentStore{} + +struct Inner { + freelist: Vec, + data: Vec<*mut T>, + size: usize, + compare_mask: usize, + index_mask: usize, +} + +type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; + +impl ComponentStore { + pub fn new(initial_size: usize) -> Self { + Self::assert_valid_size(initial_size); + + // Fill initial freelist and preallocate data array + let mut initial_freelist = Vec::with_capacity(initial_size); + for idx in 0..initial_size { + initial_freelist.push(idx as u32) + } + + let mut initial_data = Vec::new(); + initial_data.resize(initial_size, ptr::null_mut()); + + // Return initial store + return Self{ + inner: UnfairSeLock::new(Inner{ + freelist: initial_freelist, + data: initial_data, + size: initial_size, + compare_mask: 2*initial_size - 1, + index_mask: initial_size - 1, + }), + read_head: AtomicUsize::new(0), + write_head: AtomicUsize::new(initial_size), + limit_head: AtomicUsize::new(initial_size), + }; + } + + /// Creates a new element initialized to the provided `value`. This returns + /// the index at which the element can be retrieved. + pub fn create(&self, value: T) -> u32 { + let lock = self.inner.lock_shared(); + let (lock, index) = self.pop_freelist_index(lock); + self.initialize_at_index(lock, index, value); + return index; + } + + /// Destroys an element at the provided `index`. The caller must make sure + /// that it does not use any previously received references to the data at + /// this index, and that no more calls to `get` are performed using this + /// index. This is allowed again if the index has been reacquired using + /// `create`. + pub fn destroy(&self, index: u32) { + let lock = self.inner.lock_shared(); + self.destruct_at_index(&lock, index); + self.push_freelist_index(&lock, index); + } + + /// Retrieves an element by reference + pub fn get(&self, index: u32) -> &T { + let lock = self.inner.lock_shared(); + let value = lock.data[index as usize]; + unsafe { + debug_assert!(!value.is_null()); + return &*value; + } + } + + /// Retrieves an element by mutable reference. The caller should ensure that + /// use of that mutability is thread-safe + pub fn get_mut(&self, index: u32) -> &mut T { + let lock = self.inner.lock_shared(); + let value = lock.data[index as usize]; + unsafe { + debug_assert!(!value.is_null()); + return &mut *value; + } + } + + #[inline] + fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) { + 'attempt_read: loop { + // Load indices and check for reallocation condition + let current_size = read_lock.size; + let mut read_index = self.read_head.load(Ordering::Relaxed); + let limit_index = self.limit_head.load(Ordering::Acquire); + + if read_index == limit_index { + read_lock = self.reallocate(current_size, read_lock); + continue 'attempt_read; + } + + loop { + let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask]; + if let Err(actual_read_index) = self.read_head.compare_exchange( + read_index, (read_index + 1) & read_lock.compare_mask, + Ordering::AcqRel, Ordering::Acquire + ) { + // We need to try again + read_index = actual_read_index; + continue 'attempt_read; + } + + // If here then we performed the read + return (read_lock, preemptive_read); + } + } + } + + #[inline] + fn initialize_at_index(&self, read_lock: InnerRead, index: u32, value: T) { + let mut target_ptr = read_lock.data[index as usize]; + + unsafe { + if target_ptr.is_null() { + let layout = Layout::for_value(&value); + target_ptr = std::alloc::alloc(layout).cast(); + let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr()); + *rewrite.add(index as usize) = target_ptr; + } + + std::ptr::write(target_ptr, value); + } + } + + #[inline] + fn push_freelist_index(&self, read_lock: &InnerRead, index_to_put_back: u32) { + // Acquire an index in the freelist to which we can write + let mut cur_write_index = self.write_head.load(Ordering::Relaxed); + let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask; + while let Err(actual_write_index) = self.write_head.compare_exchange( + cur_write_index, new_write_index, + Ordering::AcqRel, Ordering::Acquire + ) { + cur_write_index = actual_write_index; + new_write_index = (cur_write_index + 1) & read_lock.compare_mask; + } + + // We own the data at the index, write to it and notify reader through + // limit_head that it can be read from. Note that we cheat around the + // rust mutability system here :) + unsafe { + let target: *mut u32 = transmute(read_lock.freelist.as_ptr()); + *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back; + } + + // Essentially spinlocking, relaxed failure ordering because the logic + // is that a write first moves the `write_head`, then the `limit_head`. + while let Err(_) = self.limit_head.compare_exchange( + cur_write_index, new_write_index, + Ordering::AcqRel, Ordering::Relaxed + ) {}; + } + + #[inline] + fn destruct_at_index(&self, read_lock: &InnerRead, index: u32) { + let target_ptr = read_lock.data[index as usize]; + unsafe{ ptr::drop_in_place(target_ptr); } + } + + fn reallocate(&self, old_size: usize, inner: InnerRead) -> InnerRead { + drop(inner); + { + // After dropping read lock, acquire write lock + let mut lock = self.inner.lock_exclusive(); + + if old_size == lock.size { + // We are the thread that is supposed to reallocate + let new_size = old_size * 2; + Self::assert_valid_size(new_size); + + // Note that the atomic indices are in the range [0, new_size) + // already, so we need to be careful + let new_index_mask = new_size - 1; + let new_compare_mask = (2 * new_size) - 1; + lock.data.resize(new_size, ptr::null_mut()); + lock.freelist.resize(new_size, 0); + for idx in 0..old_size { + lock.freelist[old_size + idx] = lock.freelist[idx]; + } + + // We need to fill the freelist with the indices of all of the + // new elements that we have just created. + debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst)); + let old_read_index = self.read_head.load(Ordering::SeqCst); + let old_write_index = self.write_head.load(Ordering::SeqCst); + + if old_read_index > old_write_index { + // Read index wraps, so keep it as-is and fill + let new_read_index = old_read_index + old_size; + for index in 0..old_size { + let target_idx = (new_read_index + index) & new_index_mask; + lock.freelist[target_idx] = (old_size + index) as u32; + } + + self.read_head.store(new_read_index, Ordering::SeqCst); + debug_assert!(new_read_index < 2*new_size); + debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size); + } else { + // No wrapping, so increment write index + let new_write_index = old_write_index + old_size; + for index in 0..old_size { + let target_idx = (old_write_index + index) & new_index_mask; + lock.freelist[target_idx] = (old_size + index) as u32; + } + + // Update write/limit heads + self.write_head.store(new_write_index, Ordering::SeqCst); + self.limit_head.store(new_write_index, Ordering::SeqCst); + debug_assert!(new_write_index < 2*new_size); + debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size); + } + + // Update sizes and masks + lock.size = new_size; + lock.compare_mask = new_compare_mask; + lock.index_mask = new_index_mask; + } // else: someone else allocated, so we don't have to + } + + // We've dropped the write lock, acquire the read lock again + return self.inner.lock_shared(); + } + + #[inline] + fn assert_valid_size(size: usize) { + // Condition the size needs to adhere to. Some are a bit excessive, but + // we don't hit this check very often + assert!( + size.is_power_of_two() && + size >= 4 && + size <= usize::MAX / 2 && + size <= u32::MAX as usize + ); + } +} + +impl Drop for ComponentStore { + fn drop(&mut self) { + let value_layout = Layout::from_size_align( + std::mem::size_of::(), std::mem::align_of::() + ).unwrap(); + + // Note that if the indices exist in the freelist then the destructor + // has already been called. So handle them first + let mut lock = self.inner.lock_exclusive(); + + let read_index = self.read_head.load(Ordering::Acquire); + let write_index = self.write_head.load(Ordering::Acquire); + debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire)); + + let mut index = read_index; + while index != write_index { + let dealloc_index = lock.freelist[index & lock.index_mask] as usize; + let target_ptr = lock.data[dealloc_index]; + + unsafe { + dealloc(target_ptr.cast(), value_layout); + lock.data[dealloc_index] = ptr::null_mut(); + } + + index += 1; + index &= lock.compare_mask; + } + + // With all of those set to null, we'll just iterate through all + // pointers and destruct+deallocate the ones not set to null yet + for target_ptr in lock.data.iter().copied() { + if !target_ptr.is_null() { + unsafe { + ptr::drop_in_place(target_ptr); + dealloc(target_ptr.cast(), value_layout); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use rand::prelude::*; + use rand_pcg::Pcg32; + + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + pub struct Resource { + dtor: Arc, + val: u64, + } + + impl Resource { + fn new(ctor: Arc, dtor: Arc, val: u64) -> Self { + ctor.fetch_add(1, Ordering::SeqCst); + return Self{ dtor, val }; + } + } + + impl Drop for Resource { + fn drop(&mut self) { + self.dtor.fetch_add(1, Ordering::SeqCst); + } + } + + fn seeds() -> Vec<[u8;16]> { + return vec![ + [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172], + [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43], + [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70], + [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193], + [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144], + ] + } + + #[test] + fn test_ctor_dtor_simple_unthreaded() { + const NUM_ROUNDS: usize = 5; + const NUM_ELEMENTS: usize = 1024; + + let store = ComponentStore::new(32); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut indices = Vec::with_capacity(NUM_ELEMENTS); + for _round_index in 0..NUM_ROUNDS { + // Creation round + for value in 0..NUM_ELEMENTS { + let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64); + let new_index = store.create(new_resource); + indices.push(new_index); + } + + // Checking round + for el_index in indices.iter().copied() { + let element = store.get(el_index); + assert_eq!(element.val, el_index as u64); + } + + // Destruction round + for el_index in indices.iter().copied() { + store.destroy(el_index); + } + + indices.clear(); + } + + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64); + } + + #[test] + fn test_ctor_dtor_simple_threaded() { + const MAX_SIZE: usize = 1024; + const NUM_THREADS: usize = 4; + const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS; + const NUM_ROUNDS: usize = 4; + + assert!(MAX_SIZE % NUM_THREADS == 0); + + let store = Arc::new(ComponentStore::new(16)); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut threads = Vec::with_capacity(NUM_THREADS); + for thread_index in 0..NUM_THREADS { + // Setup local clones to move into the thread + let store = store.clone(); + let first_index = thread_index * NUM_PER_THREAD; + let last_index = (thread_index + 1) * NUM_PER_THREAD; + let ctor_counter = ctor_counter.clone(); + let dtor_counter = dtor_counter.clone(); + + let handle = std::thread::spawn(move || { + let mut indices = Vec::with_capacity(last_index - first_index); + for _round_index in 0..NUM_ROUNDS { + // Creation round + for value in first_index..last_index { + let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64)); + indices.push(el_index); + } + + // Checking round + for (value_offset, el_index) in indices.iter().copied().enumerate() { + let element = store.get(el_index); + assert_eq!(element.val, (first_index + value_offset) as u64); + } + + // Destruction round + for el_index in indices.iter().copied() { + store.destroy(el_index); + } + + indices.clear(); + } + }); + threads.push(handle); + } + + for thread in threads { + thread.join().expect("clean exit"); + } + + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64); + } + + #[test] + fn test_ctor_dtor_random_threaded() { + const NUM_ROUNDS: usize = 4; + const NUM_THREADS: usize = 4; + const NUM_OPERATIONS: usize = 1024; + const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS; + const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS; + const NUM_STORED_PER_THREAD: usize = 32; + + assert!(NUM_OPERATIONS % NUM_THREADS == 0); + assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD); + + let seeds = seeds(); + for seed_index in 0..seeds.len() { + // Setup store, counters and threads + let store = Arc::new(ComponentStore::new(16)); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut threads = Vec::with_capacity(NUM_THREADS); + for thread_index in 0..NUM_THREADS { + // Setup local clones to move into the thread + let store = store.clone(); + let ctor_counter = ctor_counter.clone(); + let dtor_counter = dtor_counter.clone(); + + // Setup local rng + let mut seed = seeds[seed_index]; + for seed_val_idx in 0..16 { + seed[seed_val_idx] ^= thread_index as u8; // blegh + } + let mut rng = Pcg32::from_seed(seed); + + let handle = std::thread::spawn(move || { + let mut stored = Vec::with_capacity(NUM_STORED_PER_THREAD); + + for _round_index in 0..NUM_ROUNDS { + // Modify store elements in the store randomly, for some + // silly definition of random + for _op_index in 0..NUM_OPS_PER_ROUND { + // Perform a single operation, depending on current + // size of the number of values owned by this thread + let new_value = rng.next_u64(); + let should_create = rng.next_u32() % 2 == 0; + let is_empty = stored.is_empty(); + let is_full = stored.len() == NUM_STORED_PER_THREAD; + + if is_empty || (!is_full && should_create) { + // Must create + let el_index = store.create(Resource::new( + ctor_counter.clone(), dtor_counter.clone(), new_value + )); + stored.push((el_index, new_value)); + } else { + // Must destroy + let stored_index = new_value as usize % stored.len(); + let (el_index, el_value) = stored.remove(stored_index); + store.destroy(el_index); + } + } + + // Checking if the values we own still make sense + for (el_index, value) in stored.iter().copied() { + let gotten = store.get(el_index); + assert_eq!(value, gotten.val, "failed at thread {} value {}", thread_index, el_index); + } + } + + return stored.len(); // return number of remaining elements + }); + threads.push(handle); + } + + // Done with the current round + let mut total_left_allocated = 0; + for thread in threads { + let num_still_stored = thread.join().unwrap(); + total_left_allocated += num_still_stored as u64; + } + + // Before store is dropped + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls); + + // After store is dropped + drop(store); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + } + } +} \ No newline at end of file diff --git a/src/runtime2/store/mod.rs b/src/runtime2/store/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..5746a4d669dabde8713683eb0178160e1cb3c809 --- /dev/null +++ b/src/runtime2/store/mod.rs @@ -0,0 +1,4 @@ +pub mod component; +pub mod unfair_se_lock; + +pub(crate) use component::ComponentStore; diff --git a/src/runtime2/store/unfair_se_lock.rs b/src/runtime2/store/unfair_se_lock.rs new file mode 100644 index 0000000000000000000000000000000000000000..a89a3f3dc5b475afdcd8c52e64a2a28b72e72410 --- /dev/null +++ b/src/runtime2/store/unfair_se_lock.rs @@ -0,0 +1,194 @@ +use std::cell::UnsafeCell; +use std::sync::atomic::{AtomicU32, Ordering}; + +/// An unfair shared/exclusive lock. One may quickly describe this to be an +/// unfair RwLock where a thread that wishes to write will get to write as fast +/// as possible (i.e. waiting for readers to finish), but others writers in line +/// may have to wait for another round of readers acquiring the lock. +/// +/// However, this is NOT a read/write lock. It is a shared/exclusive lock. It is +/// used in concurrent datastructures (implemented with atomics), so particular +/// kinds of writing may still occur by the threads holding a shared lock. In +/// that case the programmer must make sure that these writes are coordinated +/// in a thread-safe manner. +/// +/// It was designed with resizable (ring)buffers in mind: most often you have +/// the standard atomic pointers/indices moving around in the ringbuffer. But +/// when the buffer needs to be resized you need to be sure that no-one is +/// reading/writing the wrong/old/deallocated buffer pointer. Hence the +/// shared/exclusive terminology. +/// +/// For this reason the `UnfairSeLock` was written assuming that exclusive locks +/// are only held sometime: shared locks are obtained most of the time. +// Note: preliminary benchmark batches shows this is ~2x faster than a RwLock +// when under some contention. +pub struct UnfairSeLock { + // Uses 31 bits to track number of shared locks, and the high bit is set if + // an exclusive lock is supposed to be held. 31 bits is more than sufficient + // because in this project shared locks will be held by individual threads. + shared: AtomicU32, + cell: UnsafeCell, +} + +// Exclusive bit is set in the atomic value when a thread wishes to hold an +// exclusive lock. +const EXCLUSIVE_BIT: u32 = 1 << 31; + +impl UnfairSeLock { + pub fn new(value: T) -> Self { + return Self{ + shared: AtomicU32::new(0), + cell: UnsafeCell::new(value), + } + } + + /// Get shared access to the underlying data. + #[must_use] + pub fn lock_shared(&self) -> UnfairSeLockSharedGuard { + let mut shared = self.shared.load(Ordering::Relaxed); + loop { + if shared & EXCLUSIVE_BIT != 0 { + shared = self.wait_until_not_exclusive(shared); + } + + // Spinlock until we've incremented. If we fail we need to check the + // exclusive bit again. + let new_shared = shared + 1; + match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) { + Ok(_) => return UnfairSeLockSharedGuard::new(self, new_shared), + Err(actual_value) => { shared = actual_value; }, + } + } + } + + /// Get exclusive access to the underlying data. + #[must_use] + pub fn lock_exclusive(&self) -> UnfairSeLockExclusiveGuard { + let mut shared = self.shared.load(Ordering::Relaxed); + loop { + if shared & EXCLUSIVE_BIT != 0 { + shared = self.wait_until_not_exclusive(shared); + } + + // We want to set the write bit + let new_shared = shared | EXCLUSIVE_BIT; + match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) { + Ok(_) => { + // We've acquired the write lock, but we still might have + // to wait until the reader count is at 0. + shared = new_shared; + if shared != EXCLUSIVE_BIT { + shared = self.wait_until_not_shared(shared); + } + + return UnfairSeLockExclusiveGuard::new(self); + }, + Err(actual_value) => { shared = actual_value; } + } + } + } + + fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 { + // Assume this is only called when the EXCLUSIVE_BIT is set + debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT); + loop { + // So spin until no longer held + shared = self.shared.load(Ordering::Acquire); + if shared & EXCLUSIVE_BIT == 0 { + return shared; + } + } + } + + #[inline] + fn wait_until_not_shared(&self, mut shared: u32) -> u32 { + // This is only called when someone has signaled the exclusive bit, but + // there are still threads holding the shared lock. + loop { + debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT); + if shared == EXCLUSIVE_BIT { + // shared count is 0 + return shared; + } + + shared = self.shared.load(Ordering::Acquire); + } + } +} + +/// A guard signifying that the owner has shared access to the underlying +/// `UnfairSeLock`. +pub struct UnfairSeLockSharedGuard<'a, T> { + lock: &'a UnfairSeLock, + initial_value: u32, +} + +impl<'a, T> UnfairSeLockSharedGuard<'a, T> { + fn new(lock: &'a UnfairSeLock, initial_value: u32) -> Self { + return Self{ lock, initial_value }; + } + + /// Force retrieval of the underlying type `T` in the mutable sense. Note + /// that the caller is now responsible for ensuring that concurrent mutable + /// access takes place in a correct fashion. + #[inline] + pub unsafe fn get_mut(&self) -> &mut T { + return unsafe{ &mut *self.lock.cell.get() }; + } +} + +impl<'a, T> Drop for UnfairSeLockSharedGuard<'a, T> { + fn drop(&mut self) { + // Spinlock until we've decremented the number of shared locks. + let mut value = self.initial_value; + while let Err(actual_value) = self.lock.shared.compare_exchange_weak( + value, value - 1, Ordering::AcqRel, Ordering::Acquire + ) { + value = actual_value; + } + } +} + +impl<'a, T> std::ops::Deref for UnfairSeLockSharedGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + return unsafe{ &*self.lock.cell.get() }; + } +} + +/// A guard signifying that the owner has exclusive access to the underlying +/// `UnfairSeLock`. +pub struct UnfairSeLockExclusiveGuard<'a, T> { + lock: &'a UnfairSeLock, +} + +impl<'a, T> UnfairSeLockExclusiveGuard<'a, T> { + fn new(lock: &'a UnfairSeLock) -> Self { + return Self{ lock }; + } +} + +impl<'a, T> Drop for UnfairSeLockExclusiveGuard<'a, T> { + fn drop(&mut self) { + // We have the exclusive bit set, and this type was constructed when + // the number of shared locks was at 0, we can safely store a `0` into + // the atomic + debug_assert_eq!(self.lock.shared.load(Ordering::Relaxed), EXCLUSIVE_BIT); // relaxed because we acquired it before + self.lock.shared.store(0, Ordering::Release); + } +} + +impl<'a, T> std::ops::Deref for UnfairSeLockExclusiveGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + return unsafe{ &*self.lock.cell.get() }; + } +} + +impl<'a, T> std::ops::DerefMut for UnfairSeLockExclusiveGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + return unsafe{ &mut *self.lock.cell.get() }; + } +} \ No newline at end of file