Changeset - 0e1a76667937
[Not reviewed]
0 7 4
MH - 4 years ago 2022-01-05 12:11:03
contact@maxhenger.nl
Started work on speculationless runtime
11 files changed with 1188 insertions and 250 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -12,32 +12,31 @@ edition = "2018"
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 
rand = "0.8.4"
 
rand_pcg = "0.3.1"
 

	
 
[lib]
 
crate-type = [
 
	"rlib", # for use as a Rust dependency.
 
]
 
\ No newline at end of file
src/protocol/eval/executor.rs
Show inline comments
 
@@ -175,49 +175,49 @@ impl Frame {
 
                    Literal::Tuple(value_expr_ids) => {
 
                        for value_expr_id in value_expr_ids {
 
                            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                            self.serialize_expression(heap, *value_expr_id);
 
                        }
 
                    }
 
                }
 
            },
 
            Expression::Cast(expr) => {
 
                self.serialize_expression(heap, expr.subject);
 
            }
 
            Expression::Call(expr) => {
 
                for arg_expr_id in &expr.arguments {
 
                    self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                    self.serialize_expression(heap, *arg_expr_id);
 
                }
 
            },
 
            Expression::Variable(_expr) => {
 
                // No subexpressions
 
            }
 
        }
 
    }
 
}
 

	
 
type EvalResult = Result<EvalContinuation, EvalError>;
 
pub type EvalResult = Result<EvalContinuation, EvalError>;
 

	
 
#[derive(Debug)]
 
pub enum EvalContinuation {
 
    // Returned in both sync and non-sync modes
 
    Stepping,
 
    // Returned only in sync mode
 
    BranchInconsistent,
 
    SyncBlockEnd,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, ValueGroup),
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
    SyncBlockStart,
 
    NewComponent(DefinitionId, i32, ValueGroup),
 
    NewChannel,
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
    pub(crate) frames: Vec<Frame>,
src/protocol/eval/mod.rs
Show inline comments
 
@@ -6,26 +6,26 @@
 
/// machine code is generated.
 
///
 
/// Code is always executed within a "frame". For Reowolf the first frame is
 
/// usually an executed component. All subsequent frames are function calls.
 
/// Simple values live on the "stack". Each variable/parameter has a place on
 
/// the stack where its values are stored. If the value is not a primitive, then
 
/// its value will be stored in the "heap". Expressions are treated differently
 
/// and use a separate "stack" for their evaluation.
 
///
 
/// Since this is a value-based language, most values are copied. One has to be
 
/// careful with values that reside in the "heap" and make sure that copies are
 
/// properly removed from the heap..
 
///
 
/// Just to reiterate: this is a temporary wasteful implementation. A proper
 
/// implementation would fully fill out the type table with alignment/size/
 
/// offset information and lay out bytecode.
 

	
 
pub(crate) mod value;
 
pub(crate) mod store;
 
pub(crate) mod executor;
 
pub(crate) mod error;
 

	
 
pub use error::EvalError;
 
pub use value::{PortId, Value, ValueGroup};
 
pub use executor::{EvalContinuation, Prompt};
 
pub use executor::{EvalContinuation, EvalResult, Prompt};
 

	
src/runtime2/communication.rs
Show inline comments
 
use super::runtime::*;
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub local_peer_index: u32,
 
}
 

	
 
/// Public inbox: accessible by all threads. Essentially a MPSC channel
 
pub struct InboxPublic {
 

	
 
}
 
\ No newline at end of file
src/runtime2/component.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use super::runtime::*;
 
use super::scheduler::SchedulerCtx;
 
use super::communication::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<ValueGroup>, // same size as "ports"
 
}
 

	
 
impl CompCtx {
 
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
 
        let old_value = &mut self.messages[port_id.0 as usize];
 
        if old_value.values.is_empty() {
 
            return None;
 
        }
 

	
 
        // Replace value in array with an empty one
 
        let mut message = ValueGroup::new_stack(Vec::new());
 
        std::mem::swap(old_value, &mut message);
 
        return Some(message);
 
    }
 

	
 
    fn find_peer(&self, port_id: PortId) -> &Peer {
 
        let port_info = &self.ports[port_id.0 as usize];
 
        let peer_info = &self.peers[port_info.local_peer_index as usize];
 
        return peer_info;
 
    }
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync,
 
    Sync,
 
    BlockedGet,
 
    BlockedPut,
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub exec_ctx: ExecCtx,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt) -> Self {
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                let peer = comp_ctx.find_peer(port_id);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
            }
 
        }
 

	
 
        return Ok(CompScheduling::Sleep);
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
    }
 
}
 

	
 
#[inline]
 
fn transform_port_id(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 

	
 
mod store;
 
mod runtime;
 
mod component;
 
mod communication;
 
\ No newline at end of file
 
mod communication;
 
mod scheduler;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::mem::{size_of, align_of, transmute};
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::sync::Arc;
 
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 

	
 
use super::component::{CompCtx, CompPDL};
 
use super::store::ComponentStore;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Copy, Clone)]
 
pub(crate) struct CompKey(CompId);
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(u32);
 

	
 
/// Generational ID of a component
 
#[derive(Copy, Clone)]
 
pub(crate) struct CompId {
 
    pub index: u32,
 
    pub generation: u32,
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
impl PartialEq for CompId {
 
    fn eq(&self, other: &Self) -> bool {
 
        return self.index.eq(&other.index);
 
/// Generational ID of a component
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
    }
 

	
 
    /// Upgrade component ID to component key. Unsafe because the caller needs
 
    /// to make sure that only one component key can exist at a time (to ensure
 
    /// a component can only be scheduled/executed by one thread).
 
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
 
        return CompKey(self.0);
 
    }
 
}
 
impl Eq for CompId {}
 

	
 
/// In-runtime storage of a component
 
pub(crate) struct RtComp {
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub private: CompPrivate,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances
 
}
 

	
 
/// Handle to public part of a component.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return unsafe{ &*self.target };
 
    }
 
}
 

	
 
/// May contain non thread-safe fields. Accessed only by the scheduler which
 
/// will temporarily "own" the component.
 
pub(crate) struct CompPrivate {
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
type RuntimeHandle = Arc<Runtime>;
 
pub type RuntimeHandle = Arc<Runtime>;
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub struct Runtime {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
        return Runtime{
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(0),
 
        };
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime containers
 
// -----------------------------------------------------------------------------
 

	
 
/// Component storage. Note that it shouldn't be polymorphic, but making it so
 
/// allows us to test it more easily. The container is essentially a
 
/// thread-safe freelist. The list always contains *all* free entries in the
 
/// storage array.
 
///
 
/// The freelist itself is implemented using a thread-safe ringbuffer. But there
 
/// are some very important properties we exploit in this specific
 
/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e.
 
/// adding to the freelist) corresponds to destroying a component, and reading
 
/// from the ringbuffer corresponds to creating a component. The aforementioned
 
/// properties are: one can never write more to the ringbuffer than has been
 
/// read from it (i.e. destroying more components than are created), we may
 
/// safely assume that when the `CompStore` is dropped that no thread can access
 
/// it (because they've all been shut down). This simplifies deallocation code.
 
///
 
/// Internally each individual instance of `T` will be (de)allocated. So we will
 
/// not store an array of `T`, but an array of `*T`. This keeps the storage of
 
/// `T` pointer-stable (as is required for the schedulers actually running the
 
/// components, because they'll fetch a component and then continue running it
 
/// while this component storage might get reallocated).
 
///
 
/// Note that there is still some unsafety here that is kept in check by the
 
/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that
 
/// only one mutable reference will ever be obtained, and potentially multiple
 
/// immutable references. But in practice the `&mut T` will be used to access
 
/// so-called "public" fields immutably, and "private" fields mutable. While the
 
/// `&T` will only be used to access the "public" fields immutably.
 
struct CompStore<T: Sized> {
 
    freelist: *mut u32,
 
    data: *mut *mut T,
 
    count: usize,
 
    mask: usize,
 
    byte_size: usize, // used for dealloc
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
    read_head: AtomicUsize,
 
}
 

	
 
const fn compute_realloc_flag() -> usize {
 
    match size_of::<usize>() {
 
        4 => return 1 << 31, // 32-bit system
 
        8 => return 1 << 63, // 64-bit system
 
        _ => panic!("unexpected byte size for 'usize'")
 
    }
 
}
 

	
 
impl<T: Sized> CompStore<T> {
 
    const REALLOC_FLAG: usize = compute_realloc_flag();
 

	
 
    fn new(initial_count: usize) -> Self {
 
        // Allocate data
 
        debug_assert!(size_of::<T>() > 0); // No ZST during testing (and definitely not in production)
 
        let (freelist, data, byte_size) = Self::alloc_buffer(initial_count);
 

	
 
        unsafe {
 
            // Init the freelist to all of the indices in the array of data
 
            let mut target = freelist;
 
            for idx in 0..initial_count as u32 {
 
                *target = idx;
 
                target += 1;
 
            }
 
    // Scheduling and retrieving work
 

	
 
            // And init the data such that they're all NULL pointers
 
            std::ptr::write_bytes(data, 0, initial_count);
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        return CompStore{
 
            freelist, data,
 
            count: initial_count,
 
            mask: initial_count - 1,
 
            byte_size,
 
            write_head: AtomicUsize::new(initial_count),
 
            limit_head: AtomicUsize::new(initial_count),
 
            read_head: AtomicUsize::new(0),
 
        };
 
        return lock.pop_front();
 
    }
 

	
 
    fn get_index_from_freelist(&self) -> u32 {
 
        let compare_mask = (self.count * 2) - 1;
 
        let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
 

	
 
        'try_loop: loop {
 
            let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second
 

	
 
            // By definition we always have `read_index <= limit_index` (if we would
 
            // have an infinite buffer, in reality we will wrap).
 
            if (read_index & compare_mask) == (limit_index & compare_mask) {
 
                // We need to create a bigger buffer. Note that no reader can
 
                // *ever* set the read index to beyond the limit index, and it
 
                // is currently equal. So we're certain that there is no other
 
                // reader currently updating the read_head.
 
                //
 
                // To test if we are supposed to resize the backing buffer we
 
                // try set the REALLOC_FLAG on the limit index. Note that the
 
                // stored indices are always in the range [0, 2*count). So if
 
                // we add REALLOC_FLAG to the limit index, then the masked
 
                // condition above still holds! Other potential readers will end
 
                // up here and are allowed to wait until we resized the backing
 
                // container.
 
                //
 
                // Furthermore, setting the limit index to this high value also
 
                // notifies the writer that any of it writes should be tried
 
                // again, as they're writing to a buffer that is going to get
 
                // trashed.
 
                todo!("finish reallocation code");
 
                match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => {
 
                        // Limit index has changed, so we're now the ones that
 
                        // are supposed to resize the
 
                    }
 
                }
 
            } else {
 
                // It seems we have space to read
 
                let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) };
 
                if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
                    // Failed to do the CAS, try again. We need to start at the
 
                    // start again because we might have had other readers that
 
                    // were successful, so at the very least, the preemptive
 
                    // read we did is no longer correct.
 
                    read_index = new_read_index;
 
                    continue 'try_loop;
 
                }
 

	
 
                // We now "own" the value at the read index
 
                return preemptive_read;
 
            }
 
        }
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    fn put_back_index_into_freelist(&self, index: u32) {
 
        let mut compare_mask = (self.count * 2) - 1;
 
        let mut write_index = self.write_head.load(Ordering::Acquire);
 
        while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
            // Failed to do the CAS, try again
 
            write_index = new_write_index;
 
        }
 

	
 
        'try_write_loop: loop {
 
            // We are now the only ones that can write at `write_index`. Try to
 
            // do so
 
            unsafe { *self.freelist.add(write_index & self.mask) = index; }
 

	
 
            // But we still need to move the limit head. Only succesful writers
 
            // may move it so we expect it to move from the `write_index` to
 
            // `write_index + 1`, but we might have to spin to achieve it.
 
            // Furthermore, the `limit_head` is used by the index-retrieval
 
            // function to indicate that a read is in progress.
 
            'commit_to_write_loop: loop {
 
                match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => break,
 
                    Err(new_value) => {
 
                        // Two options: the limit is not yet what we expect it
 
                        // to be. If so, just try again with the old values.
 
                        // But if it is very large (relatively) then this is the
 
                        // signal from the reader that the entire storage is
 
                        // being resized
 
                        if new_value & Self::REALLOC_FLAG != 0 {
 
                            // Someone is resizing, wait until that is no longer
 
                            // true.
 
                            while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 {
 
                                // still resizing
 
                            }
 

	
 
                            // Not resizing anymore, try everything again, our
 
                            // old write has now become invalid. But our index
 
                            // hasn't! So we need to finish our write and our
 
                            // increment of the limit head
 
                            continue 'try_write_loop;
 
                        } else {
 
                            // Just try again
 
                            continue 'commit_to_write_loop;
 
                        }
 
                    }
 
    // Creating/destroying components
 

	
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey {
 
        let comp = RuntimeComp{
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
            },
 
            private: CompPrivate{
 
                code: comp,
 
                ctx: CompCtx{
 
                    id: CompId(0),
 
                    ports: Vec::new(),
 
                    peers: Vec::new(),
 
                    messages: Vec::new(),
 
                }
 
            }
 
        };
 

	
 
            // We updated the limit head, so we're done :)
 
            return;
 
        }
 
    }
 

	
 
    /// Retrieves a `&T` from the store. This should be retrieved using `create`
 
    /// and not yet given back by calling `destroy`.
 
    fn get(&self, index: u32) -> &T {
 
        let index = self.components.create(comp);
 

	
 
    }
 

	
 
    /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you
 
    /// know what you're doing :)
 
    fn get_mut(&self, index: u32) -> &mut T {
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        self.components.get_mut(index).private.ctx.id = CompId(index);
 

	
 
        return CompKey(index);
 
    }
 

	
 
    fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) {
 
        // Probably overkill considering the amount of memory that is needed to
 
        // exceed this number. But still: ensure `num` adheres to the
 
        // requirements needed for correct functioning of the store.
 
        assert!(
 
            num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(),
 
            "invalid allocation count for CompStore buffer"
 
        );
 

	
 
        // Compute byte size of freelist (so we assume alignment of `u32`)
 
        let mut byte_size = num * size_of::<u32>();
 

	
 
        // Align to `*mut T`, then reserve space for all of the pointers
 
        byte_size = Self::align_to(byte_size, align_of::<*mut T>());
 
        let byte_offset_data = byte_size;
 
        byte_size += num * size_of::<T>;
 

	
 
        unsafe {
 
            // Allocate, then retrieve pointers to allocated regions
 
            let layout = Self::layout_for(byte_size);
 
            let memory = alloc(layout);
 
            let base_free: *mut u32 = transmute(memory);
 
            let base_data: *mut *mut T = transmute(memory.add(byte_offset_data));
 

	
 
            return (base_free, base_data, byte_size);
 
        }
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
        let component = self.components.get_mut(key.0);
 
        return component;
 
    }
 

	
 
    fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) {
 
        // Note: we only did one allocation, freelist is at the front
 
        let layout = Self::layout_for(byte_size);
 
        unsafe {
 
            let base: *mut u8 = transmute(freelist);
 
            dealloc(base, layout);
 
        }
 
    pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic {
 
        let component = self.components.get(id.0);
 
        return &component.public;
 
    }
 

	
 
    fn layout_for(byte_size: usize) -> Layout {
 
        debug_assert!(byte_size % size_of::<u32>() == 0);
 
        return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::<u32>()) };
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        self.components.destroy(key.0);
 
    }
 

	
 
    fn align_to(offset: usize, alignment: usize) -> usize {
 
        debug_assert!(alignment.is_power_of_two());
 
        let mask = alignment - 1;
 
        return (offset + mask) & !mask;
 
    }
 
}
 
\ No newline at end of file
 
    // Interacting with components
 
}
src/runtime2/scheduler.rs
Show inline comments
 
new file 100644
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: RuntimeHandle,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a Runtime,
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let comp_id = comp_key.downgrade();
 
            let component = self.runtime.get_component(comp_key);
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.private.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        todo!("check for messages");
 
    }
 

	
 
    fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) {
 
        todo!("do something")
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
new file 100644
 
/*
 
 * Component Store
 
 *
 
 * Concurrent datastructure for creating/destroying/retrieving components using
 
 * their ID. It is essentially a variation on a concurrent freelist. We store an
 
 * array of (potentially null) pointers to data. Indices into this array that
 
 * are unused (but may be left allocated) are in a freelist. So creating a new
 
 * bit of data involves taking an index from this freelist. Destruction involves
 
 * putting the index back.
 
 *
 
 * This datastructure takes care of the threadsafe implementation of the
 
 * freelist and calling the data's destructor when needed. Note that it is not
 
 * completely safe (in Rust's sense of the word) because it is possible to
 
 * get more than one mutable reference to a piece of data. Likewise it is
 
 * possible to put back bogus indices into the freelist, which will destroy the
 
 * integrity of the datastructure.
 
 *
 
 * Some underlying assumptions that led to this design (note that I haven't
 
 * actually checked these conditions or performed any real profiling, yet):
 
 *  - Resizing the freelist should be very rare. The datastructure should grow
 
 *    to some kind of maximum size and stay at that size.
 
 *  - Creation should (preferably) be faster than deletion of data. Reason being
 
 *    that creation implies we're creating a component that has code to be
 
 *    executed. Better to quickly be able to execute code than being able to
 
 *    quickly tear down finished components.
 
 *  - Retrieval is much more likely than creation/destruction.
 
 *
 
 * Some obvious flaws with this implementation:
 
 *  - Because of the freelist implementation we will generally allocate all of
 
 *    the data pointers that are available (i.e. if we have a buffer of size
 
 *    64, but we generally use 33 elements, than we'll have 64 elements
 
 *    allocated), which might be wasteful at larger array sizes (which are
 
 *    always powers of two).
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
    size: usize,
 
    compare_mask: usize,
 
    index_mask: usize,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 

	
 
        // Fill initial freelist and preallocate data array
 
        let mut initial_freelist = Vec::with_capacity(initial_size);
 
        for idx in 0..initial_size {
 
            initial_freelist.push(idx as u32)
 
        }
 

	
 
        let mut initial_data = Vec::new();
 
        initial_data.resize(initial_size, ptr::null_mut());
 

	
 
        // Return initial store
 
        return Self{
 
            inner: UnfairSeLock::new(Inner{
 
                freelist: initial_freelist,
 
                data: initial_data,
 
                size: initial_size,
 
                compare_mask: 2*initial_size - 1,
 
                index_mask: initial_size - 1,
 
            }),
 
            read_head: AtomicUsize::new(0),
 
            write_head: AtomicUsize::new(initial_size),
 
            limit_head: AtomicUsize::new(initial_size),
 
        };
 
    }
 

	
 
    /// Creates a new element initialized to the provided `value`. This returns
 
    /// the index at which the element can be retrieved.
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        self.initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
    /// index. This is allowed again if the index has been reacquired using
 
    /// `create`.
 
    pub fn destroy(&self, index: u32) {
 
        let lock = self.inner.lock_shared();
 
        self.destruct_at_index(&lock, index);
 
        self.push_freelist_index(&lock, index);
 
    }
 

	
 
    /// Retrieves an element by reference
 
    pub fn get(&self, index: u32) -> &T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &*value;
 
        }
 
    }
 

	
 
    /// Retrieves an element by mutable reference. The caller should ensure that
 
    /// use of that mutability is thread-safe
 
    pub fn get_mut(&self, index: u32) -> &mut T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &mut *value;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) {
 
        'attempt_read: loop {
 
            // Load indices and check for reallocation condition
 
            let current_size = read_lock.size;
 
            let mut read_index = self.read_head.load(Ordering::Relaxed);
 
            let limit_index = self.limit_head.load(Ordering::Acquire);
 

	
 
            if read_index == limit_index {
 
                read_lock = self.reallocate(current_size, read_lock);
 
                continue 'attempt_read;
 
            }
 

	
 
            loop {
 
                let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & read_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (read_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerRead<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerRead<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        while let Err(actual_write_index) = self.write_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            cur_write_index = actual_write_index;
 
            new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        }
 

	
 
        // We own the data at the index, write to it and notify reader through
 
        // limit_head that it can be read from. Note that we cheat around the
 
        // rust mutability system here :)
 
        unsafe {
 
            let target: *mut u32 = transmute(read_lock.freelist.as_ptr());
 
            *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back;
 
        }
 

	
 
        // Essentially spinlocking, relaxed failure ordering because the logic
 
        // is that a write first moves the `write_head`, then the `limit_head`.
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerRead<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
 
                // already, so we need to be careful
 
                let new_index_mask = new_size - 1;
 
                let new_compare_mask = (2 * new_size) - 1;
 
                lock.data.resize(new_size, ptr::null_mut());
 
                lock.freelist.resize(new_size, 0);
 
                for idx in 0..old_size {
 
                    lock.freelist[old_size + idx] = lock.freelist[idx];
 
                }
 

	
 
                // We need to fill the freelist with the indices of all of the
 
                // new elements that we have just created.
 
                debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst));
 
                let old_read_index = self.read_head.load(Ordering::SeqCst);
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
 
            [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70],
 
            [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193],
 
            [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144],
 
        ]
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_unthreaded() {
 
        const NUM_ROUNDS: usize = 5;
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 

	
 
            // Checking round
 
            for el_index in indices.iter().copied() {
 
                let element = store.get(el_index);
 
                assert_eq!(element.val, el_index as u64);
 
            }
 

	
 
            // Destruction round
 
            for el_index in indices.iter().copied() {
 
                store.destroy(el_index);
 
            }
 

	
 
            indices.clear();
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_threaded() {
 
        const MAX_SIZE: usize = 1024;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS;
 
        const NUM_ROUNDS: usize = 4;
 

	
 
        assert!(MAX_SIZE % NUM_THREADS == 0);
 

	
 
        let store = Arc::new(ComponentStore::new(16));
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut threads = Vec::with_capacity(NUM_THREADS);
 
        for thread_index in 0..NUM_THREADS {
 
            // Setup local clones to move into the thread
 
            let store = store.clone();
 
            let first_index = thread_index * NUM_PER_THREAD;
 
            let last_index = (thread_index + 1) * NUM_PER_THREAD;
 
            let ctor_counter = ctor_counter.clone();
 
            let dtor_counter = dtor_counter.clone();
 

	
 
            let handle = std::thread::spawn(move || {
 
                let mut indices = Vec::with_capacity(last_index - first_index);
 
                for _round_index in 0..NUM_ROUNDS {
 
                    // Creation round
 
                    for value in first_index..last_index {
 
                        let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64));
 
                        indices.push(el_index);
 
                    }
 

	
 
                    // Checking round
 
                    for (value_offset, el_index) in indices.iter().copied().enumerate() {
 
                        let element = store.get(el_index);
 
                        assert_eq!(element.val, (first_index + value_offset) as u64);
 
                    }
 

	
 
                    // Destruction round
 
                    for el_index in indices.iter().copied() {
 
                        store.destroy(el_index);
 
                    }
 

	
 
                    indices.clear();
 
                }
 
            });
 
            threads.push(handle);
 
        }
 

	
 
        for thread in threads {
 
            thread.join().expect("clean exit");
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_random_threaded() {
 
        const NUM_ROUNDS: usize = 4;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_OPERATIONS: usize = 1024;
 
        const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS;
 
        const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS;
 
        const NUM_STORED_PER_THREAD: usize = 32;
 

	
 
        assert!(NUM_OPERATIONS % NUM_THREADS == 0);
 
        assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD);
 

	
 
        let seeds = seeds();
 
        for seed_index in 0..seeds.len() {
 
            // Setup store, counters and threads
 
            let store = Arc::new(ComponentStore::new(16));
 
            let ctor_counter = Arc::new(AtomicU64::new(0));
 
            let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
            let mut threads = Vec::with_capacity(NUM_THREADS);
 
            for thread_index in 0..NUM_THREADS {
 
                // Setup local clones to move into the thread
 
                let store = store.clone();
 
                let ctor_counter = ctor_counter.clone();
 
                let dtor_counter = dtor_counter.clone();
 

	
 
                // Setup local rng
 
                let mut seed = seeds[seed_index];
 
                for seed_val_idx in 0..16 {
 
                    seed[seed_val_idx] ^= thread_index as u8; // blegh
 
                }
 
                let mut rng = Pcg32::from_seed(seed);
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let mut stored = Vec::with_capacity(NUM_STORED_PER_THREAD);
 

	
 
                    for _round_index in 0..NUM_ROUNDS {
 
                        // Modify store elements in the store randomly, for some
 
                        // silly definition of random
 
                        for _op_index in 0..NUM_OPS_PER_ROUND {
 
                            // Perform a single operation, depending on current
 
                            // size of the number of values owned by this thread
 
                            let new_value = rng.next_u64();
 
                            let should_create = rng.next_u32() % 2 == 0;
 
                            let is_empty = stored.is_empty();
 
                            let is_full = stored.len() == NUM_STORED_PER_THREAD;
 

	
 
                            if is_empty || (!is_full && should_create) {
 
                                // Must create
 
                                let el_index = store.create(Resource::new(
 
                                    ctor_counter.clone(), dtor_counter.clone(), new_value
 
                                ));
 
                                stored.push((el_index, new_value));
 
                            } else {
 
                                // Must destroy
 
                                let stored_index = new_value as usize % stored.len();
 
                                let (el_index, el_value) = stored.remove(stored_index);
 
                                store.destroy(el_index);
 
                            }
 
                        }
 

	
 
                        // Checking if the values we own still make sense
 
                        for (el_index, value) in stored.iter().copied() {
 
                            let gotten = store.get(el_index);
 
                            assert_eq!(value, gotten.val, "failed at thread {} value {}", thread_index, el_index);
 
                        }
 
                    }
 

	
 
                    return stored.len(); // return number of remaining elements
 
                });
 
                threads.push(handle);
 
            }
 

	
 
            // Done with the current round
 
            let mut total_left_allocated = 0;
 
            for thread in threads {
 
                let num_still_stored = thread.join().unwrap();
 
                total_left_allocated += num_still_stored as u64;
 
            }
 

	
 
            // Before store is dropped
 
            let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
            let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
            assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls);
 

	
 
            // After store is dropped
 
            drop(store);
 
            let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
            assert_eq!(num_ctor_calls, num_dtor_calls);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/mod.rs
Show inline comments
 
new file 100644
 
pub mod component;
 
pub mod unfair_se_lock;
 

	
 
pub(crate) use component::ComponentStore;
src/runtime2/store/unfair_se_lock.rs
Show inline comments
 
new file 100644
 
use std::cell::UnsafeCell;
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
/// An unfair shared/exclusive lock. One may quickly describe this to be an
 
/// unfair RwLock where a thread that wishes to write will get to write as fast
 
/// as possible (i.e. waiting for readers to finish), but others writers in line
 
/// may have to wait for another round of readers acquiring the lock.
 
///
 
/// However, this is NOT a read/write lock. It is a shared/exclusive lock. It is
 
/// used in concurrent datastructures (implemented with atomics), so particular
 
/// kinds of writing may still occur by the threads holding a shared lock. In
 
/// that case the programmer must make sure that these writes are coordinated
 
/// in a thread-safe manner.
 
///
 
/// It was designed with resizable (ring)buffers in mind: most often you have
 
/// the standard atomic pointers/indices moving around in the ringbuffer. But
 
/// when the buffer needs to be resized you need to be sure that no-one is
 
/// reading/writing the wrong/old/deallocated buffer pointer. Hence the
 
/// shared/exclusive terminology.
 
///
 
/// For this reason the `UnfairSeLock` was written assuming that exclusive locks
 
/// are only held sometime: shared locks are obtained most of the time.
 
// Note: preliminary benchmark batches shows this is ~2x faster than a RwLock
 
// when under some contention.
 
pub struct UnfairSeLock<T> {
 
    // Uses 31 bits to track number of shared locks, and the high bit is set if
 
    // an exclusive lock is supposed to be held. 31 bits is more than sufficient
 
    // because in this project shared locks will be held by individual threads.
 
    shared: AtomicU32,
 
    cell: UnsafeCell<T>,
 
}
 

	
 
// Exclusive bit is set in the atomic value when a thread wishes to hold an
 
// exclusive lock.
 
const EXCLUSIVE_BIT: u32 = 1 << 31;
 

	
 
impl<T> UnfairSeLock<T> {
 
    pub fn new(value: T) -> Self {
 
        return Self{
 
            shared: AtomicU32::new(0),
 
            cell: UnsafeCell::new(value),
 
        }
 
    }
 

	
 
    /// Get shared access to the underlying data.
 
    #[must_use]
 
    pub fn lock_shared(&self) -> UnfairSeLockSharedGuard<T> {
 
        let mut shared = self.shared.load(Ordering::Relaxed);
 
        loop {
 
            if shared & EXCLUSIVE_BIT != 0 {
 
                shared = self.wait_until_not_exclusive(shared);
 
            }
 

	
 
            // Spinlock until we've incremented. If we fail we need to check the
 
            // exclusive bit again.
 
            let new_shared = shared + 1;
 
            match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
 
                Ok(_) => return UnfairSeLockSharedGuard::new(self, new_shared),
 
                Err(actual_value) => { shared = actual_value; },
 
            }
 
        }
 
    }
 

	
 
    /// Get exclusive access to the underlying data.
 
    #[must_use]
 
    pub fn lock_exclusive(&self) -> UnfairSeLockExclusiveGuard<T> {
 
        let mut shared = self.shared.load(Ordering::Relaxed);
 
        loop {
 
            if shared & EXCLUSIVE_BIT != 0 {
 
                shared = self.wait_until_not_exclusive(shared);
 
            }
 

	
 
            // We want to set the write bit
 
            let new_shared = shared | EXCLUSIVE_BIT;
 
            match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
 
                Ok(_) => {
 
                    // We've acquired the write lock, but we still might have
 
                    // to wait until the reader count is at 0.
 
                    shared = new_shared;
 
                    if shared != EXCLUSIVE_BIT {
 
                        shared = self.wait_until_not_shared(shared);
 
                    }
 

	
 
                    return UnfairSeLockExclusiveGuard::new(self);
 
                },
 
                Err(actual_value) => { shared = actual_value; }
 
            }
 
        }
 
    }
 

	
 
    fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 {
 
        // Assume this is only called when the EXCLUSIVE_BIT is set
 
        debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
 
        loop {
 
            // So spin until no longer held
 
            shared = self.shared.load(Ordering::Acquire);
 
            if shared & EXCLUSIVE_BIT == 0 {
 
                return shared;
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn wait_until_not_shared(&self, mut shared: u32) -> u32 {
 
        // This is only called when someone has signaled the exclusive bit, but
 
        // there are still threads holding the shared lock.
 
        loop {
 
            debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
 
            if shared == EXCLUSIVE_BIT {
 
                // shared count is 0
 
                return shared;
 
            }
 

	
 
            shared = self.shared.load(Ordering::Acquire);
 
        }
 
    }
 
}
 

	
 
/// A guard signifying that the owner has shared access to the underlying
 
/// `UnfairSeLock`.
 
pub struct UnfairSeLockSharedGuard<'a, T> {
 
    lock: &'a UnfairSeLock<T>,
 
    initial_value: u32,
 
}
 

	
 
impl<'a, T> UnfairSeLockSharedGuard<'a, T> {
 
    fn new(lock: &'a UnfairSeLock<T>, initial_value: u32) -> Self {
 
        return Self{ lock, initial_value };
 
    }
 

	
 
    /// Force retrieval of the underlying type `T` in the mutable sense. Note
 
    /// that the caller is now responsible for ensuring that concurrent mutable
 
    /// access takes place in a correct fashion.
 
    #[inline]
 
    pub unsafe fn get_mut(&self) -> &mut T {
 
        return unsafe{ &mut *self.lock.cell.get() };
 
    }
 
}
 

	
 
impl<'a, T> Drop for UnfairSeLockSharedGuard<'a, T> {
 
    fn drop(&mut self) {
 
        // Spinlock until we've decremented the number of shared locks.
 
        let mut value = self.initial_value;
 
        while let Err(actual_value) = self.lock.shared.compare_exchange_weak(
 
            value, value - 1, Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            value = actual_value;
 
        }
 
    }
 
}
 

	
 
impl<'a, T> std::ops::Deref for UnfairSeLockSharedGuard<'a, T> {
 
    type Target = T;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return unsafe{ &*self.lock.cell.get() };
 
    }
 
}
 

	
 
/// A guard signifying that the owner has exclusive access to the underlying
 
/// `UnfairSeLock`.
 
pub struct UnfairSeLockExclusiveGuard<'a, T> {
 
    lock: &'a UnfairSeLock<T>,
 
}
 

	
 
impl<'a, T> UnfairSeLockExclusiveGuard<'a, T> {
 
    fn new(lock: &'a UnfairSeLock<T>) -> Self {
 
        return Self{ lock };
 
    }
 
}
 

	
 
impl<'a, T> Drop for UnfairSeLockExclusiveGuard<'a, T> {
 
    fn drop(&mut self) {
 
        // We have the exclusive bit set, and this type was constructed when
 
        // the number of shared locks was at 0, we can safely store a `0` into
 
        // the atomic
 
        debug_assert_eq!(self.lock.shared.load(Ordering::Relaxed), EXCLUSIVE_BIT); // relaxed because we acquired it before
 
        self.lock.shared.store(0, Ordering::Release);
 
    }
 
}
 

	
 
impl<'a, T> std::ops::Deref for UnfairSeLockExclusiveGuard<'a, T> {
 
    type Target = T;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return unsafe{ &*self.lock.cell.get() };
 
    }
 
}
 

	
 
impl<'a, T> std::ops::DerefMut for UnfairSeLockExclusiveGuard<'a, T> {
 
    fn deref_mut(&mut self) -> &mut Self::Target {
 
        return unsafe{ &mut *self.lock.cell.get() };
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)