From a5ae8bee6918f55b1ff2d34c1fff24f59657ceb4 2021-12-23 10:32:34 From: mh Date: 2021-12-23 10:32:34 Subject: [PATCH] Slight extension of component store --- diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index a7a886bea822fc34356c6a58ab6695e5aa0ff92c..767088eb2a1eed792272aab2326cf3e5881f29c4 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -97,7 +97,17 @@ struct CompStore { read_head: AtomicUsize, } +const fn compute_realloc_flag() -> usize { + match size_of::() { + 4 => return 1 << 31, // 32-bit system + 8 => return 1 << 63, // 64-bit system + _ => panic!("unexpected byte size for 'usize'") + } +} + impl CompStore { + const REALLOC_FLAG: usize = compute_realloc_flag(); + fn new(initial_count: usize) -> Self { // Allocate data debug_assert!(size_of::() > 0); // No ZST during testing (and definitely not in production) @@ -128,9 +138,9 @@ impl CompStore { fn get_index_from_freelist(&self) -> u32 { let compare_mask = (self.count * 2) - 1; + let mut read_index = self.read_head.load(Ordering::Acquire); // read index first 'try_loop: loop { - let mut read_index = self.read_head.load(Ordering::Acquire); // read index first let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second // By definition we always have `read_index <= limit_index` (if we would @@ -142,11 +152,11 @@ impl CompStore { // reader currently updating the read_head. // // To test if we are supposed to resize the backing buffer we - // try to increment the limit index by 2*count. Note that the + // try set the REALLOC_FLAG on the limit index. Note that the // stored indices are always in the range [0, 2*count). So if - // we add 2*count to the limit index, then the masked condition - // above still holds! Other potential readers will end up here - // and are allowed to wait until we resized the backing + // we add REALLOC_FLAG to the limit index, then the masked + // condition above still holds! Other potential readers will end + // up here and are allowed to wait until we resized the backing // container. // // Furthermore, setting the limit index to this high value also @@ -154,7 +164,7 @@ impl CompStore { // again, as they're writing to a buffer that is going to get // trashed. todo!("finish reallocation code"); - match self.limit_head.compare_exchange(limit_index, limit_index + 2*self.count, Ordering::SeqCst, Ordering::Acquire) { + match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) { Ok(_) => { // Limit index has changed, so we're now the ones that // are supposed to resize the @@ -163,11 +173,12 @@ impl CompStore { } else { // It seems we have space to read let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) }; - if self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_err() { + if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { // Failed to do the CAS, try again. We need to start at the // start again because we might have had other readers that // were successful, so at the very least, the preemptive // read we did is no longer correct. + read_index = new_read_index; continue 'try_loop; } @@ -178,14 +189,14 @@ impl CompStore { } fn put_back_index_into_freelist(&self, index: u32) { - let compare_mask = (self.count * 2) - 1; - 'try_loop: loop { - let write_index = self.write_head.load(Ordering::Acquire); - while !self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_ok() { - // Failed to do the CAS, try again - continue 'try_loop - } + let mut compare_mask = (self.count * 2) - 1; + let mut write_index = self.write_head.load(Ordering::Acquire); + while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { + // Failed to do the CAS, try again + write_index = new_write_index; + } + 'try_write_loop: loop { // We are now the only ones that can write at `write_index`. Try to // do so unsafe { *self.freelist.add(write_index & self.mask) = index; } @@ -195,8 +206,7 @@ impl CompStore { // `write_index + 1`, but we might have to spin to achieve it. // Furthermore, the `limit_head` is used by the index-retrieval // function to indicate that a read is in progress. - loop { - todo!("finish reallocation code"); + 'commit_to_write_loop: loop { match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { Ok(_) => break, Err(new_value) => { @@ -205,6 +215,22 @@ impl CompStore { // But if it is very large (relatively) then this is the // signal from the reader that the entire storage is // being resized + if new_value & Self::REALLOC_FLAG != 0 { + // Someone is resizing, wait until that is no longer + // true. + while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 { + // still resizing + } + + // Not resizing anymore, try everything again, our + // old write has now become invalid. But our index + // hasn't! So we need to finish our write and our + // increment of the limit head + continue 'try_write_loop; + } else { + // Just try again + continue 'commit_to_write_loop; + } } } }