Changeset - 305bb82af392
[Not reviewed]
0 3 0
mh - 4 years ago 2021-11-26 10:03:11
contact@maxhenger.nl
Add generation counter to connector store
3 files changed with 78 insertions and 32 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
@@ -794,7 +794,7 @@ impl Consensus {
 
            }
 
        }
 

	
 
        println!("DEBUERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.0).collect::<Vec<_>>());
 
        println!("DEBUGERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.index).collect::<Vec<_>>());
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
src/runtime2/mod.rs
Show inline comments
 
@@ -31,6 +31,7 @@ use port::{ChannelId, Port, PortState};
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
    pub generation: u32,
 
}
 

	
 
impl ConnectorKey {
 
@@ -38,32 +39,64 @@ impl ConnectorKey {
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
        return ConnectorId{
 
            index: self.index,
 
            generation: self.generation,
 
        };
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
        return ConnectorKey{
 
            index: id.index,
 
            generation: id.generation,
 
        };
 
    }
 
}
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 
pub struct ConnectorId(pub u32);
 
#[derive(Debug, Copy, Clone)]
 
pub struct ConnectorId{
 
    pub index: u32,
 
    pub generation: u32,
 
}
 

	
 
impl PartialEq for ConnectorId {
 
    fn eq(&self, other: &Self) -> bool {
 
        return self.index.eq(&other.index);
 
    }
 
}
 

	
 
impl Eq for ConnectorId{}
 

	
 
impl PartialOrd for ConnectorId{
 
    fn partial_cmp(&self, other: &Self) -> Option<crate::common::Ordering> {
 
        return self.index.partial_cmp(&other.index)
 
    }
 
}
 

	
 
impl Ord for ConnectorId{
 
    fn cmp(&self, other: &Self) -> crate::common::Ordering {
 
        return self.partial_cmp(other).unwrap();
 
    }
 
}
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
        return ConnectorId {
 
            index: u32::MAX,
 
            generation: 0,
 
        };
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_valid(&self) -> bool {
 
        return self.0 != u32::MAX;
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
@@ -360,18 +393,23 @@ unsafe impl Sync for RuntimeInner {}
 
// ConnectorStore
 
// -----------------------------------------------------------------------------
 

	
 
struct StoreEntry {
 
    connector: ScheduledConnector,
 
    generation: std::sync::atomic::AtomicU32,
 
}
 

	
 
struct ConnectorStore {
 
    // Freelist storage of connectors. Storage should be pointer-stable as
 
    // someone might be mutating the vector while we're executing one of the
 
    // connectors.
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    entries: RawVec<*mut StoreEntry>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self {
 
            connectors: RawVec::with_capacity(capacity),
 
            entries: RawVec::with_capacity(capacity),
 
            free: Vec::with_capacity(capacity),
 
        }
 
    }
 
@@ -379,10 +417,11 @@ impl ConnectorStore {
 
    /// Retrieves public part of connector - accessible by many threads at once.
 
    fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(id.0 as usize)));
 
            let connector = self.connectors.get(id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &(**connector).public;
 
            let entry = self.entries.get(id.index as usize);
 
            let cur_generation = (**entry).generation.load(Ordering::Acquire);
 
            debug_assert_eq!(cur_generation, id.generation);
 
            debug_assert!(!entry.is_null());
 
            return &(**entry).connector.public;
 
        }
 
    }
 

	
 
@@ -390,10 +429,9 @@ impl ConnectorStore {
 
    /// time.
 
    fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(key.index as usize)));
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
 
            let entry = self.entries.get_mut(key.index as usize);
 
            debug_assert!(!entry.is_null());
 
            return &mut (**entry).connector;
 
        }
 
    }
 

	
 
@@ -413,21 +451,28 @@ impl ConnectorStore {
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            index = self.entries.len();
 
            key = ConnectorKey{
 
                index: index as u32, generation: 0
 
            };
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
            let connector = Box::into_raw(Box::new(StoreEntry{
 
                connector,
 
                generation: AtomicU32::new(0),
 
            }));
 
            self.entries.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                std::ptr::write(*target, connector);
 
                let target = &mut **self.entries.get_mut(index);
 
                std::ptr::write(&mut target.connector as *mut _, connector);
 

	
 
                let generation = target.generation.load(Ordering::Acquire);
 
                key = ConnectorKey{ index: index as u32, generation };
 
                target.connector.ctx.id = key.downcast();
 
            }
 
        }
 

	
 
@@ -439,7 +484,8 @@ impl ConnectorStore {
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let target = self.connectors.get_mut(key.index as usize);
 
            let target = self.entries.get_mut(key.index as usize);
 
            (**target).generation.fetch_add(1, Ordering::SeqCst);
 
            std::ptr::drop_in_place(*target);
 
            // Note: but not deallocating!
 
        }
 
@@ -455,7 +501,7 @@ impl Drop for ConnectorStore {
 
        // has to be deallocated
 
        for free_idx in self.free.iter().copied() {
 
            unsafe {
 
                let memory = self.connectors.get_mut(free_idx);
 
                let memory = self.entries.get_mut(free_idx);
 
                let layout = std::alloc::Layout::for_value(&**memory);
 
                std::alloc::dealloc(*memory as *mut u8, layout);
 

	
 
@@ -466,9 +512,9 @@ impl Drop for ConnectorStore {
 

	
 
        // With the deallocated stuff marked as null, clear the remainder that
 
        // is not null
 
        for idx in 0..self.connectors.len() {
 
        for idx in 0..self.entries.len() {
 
            unsafe {
 
                let memory = *self.connectors.get_mut(idx);
 
                let memory = *self.entries.get_mut(idx);
 
                if !memory.is_null() {
 
                    let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh
 
                }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -356,7 +356,7 @@ impl Scheduler {
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.index);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
@@ -384,7 +384,7 @@ impl Scheduler {
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.index, message);
 
    }
 
}
 

	
 
@@ -462,7 +462,7 @@ impl ComponentCtx {
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error);
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.index, error);
 
    }
 

	
 
    #[inline]
0 comments (0 inline, 0 general)