Changeset - 2591c0d940f8
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-02-20 12:10:59
christopher.esterhuyse@gmail.com
one buffer, multiple ranges
2 files changed with 64 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime/experimental/api.rs
Show inline comments
 
@@ -139,106 +139,160 @@ impl Connected {
 
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
 
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
 
        let ports =
 
            [Port(self.endpoint_exts.len() as u32 - 1), Port(self.endpoint_exts.len() as u32)];
 
        let channel_id = ChannelId {
 
            controller_id: self.controller_id,
 
            channel_index: self.channel_index_stream.next(),
 
        };
 
        let [e0, e1] = Endpoint::new_memory_pair();
 
        self.endpoint_exts.push(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Putter },
 
            endpoint: e0,
 
        });
 
        self.endpoint_exts.push(EndpointExt {
 
            info: EndpointInfo { channel_id, polarity: Getter },
 
            endpoint: e1,
 
        });
 
        for p in ports.iter() {
 
            self.native_ports.insert(Port(p.0));
 
        }
 
        (OutPort(ports[0]), InPort(ports[1]))
 
    }
 
    pub fn new_component(
 
        &mut self,
 
        protocol: &Arc<Protocol>,
 
        name: Vec<u8>,
 
        moved_ports: &[Port],
 
    ) -> Result<(), ()> {
 
        let moved_ports = moved_ports.iter().copied().collect();
 
        if !self.native_ports.is_superset(&moved_ports) {
 
            return Err(());
 
        }
 
        self.native_ports.retain(|e| !moved_ports.contains(e));
 
        self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name });
 
        // TODO add a singleton machine
 
        Ok(())
 
    }
 
    pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> {
 
        Ok(())
 
    }
 
    pub fn sync_subsets(
 
        &mut self,
 
        _inbuf: &mut [u8],
 
        _ops: &mut [PortOpRs],
 
        bit_subsets: &[&[usize]],
 
    ) -> Result<usize, ()> {
 
        for (batch_index, bit_subset) in bit_subsets.iter().enumerate() {
 
            println!("batch_index {:?}", batch_index);
 
            use super::bits::BitChunkIter;
 
            let chunk_iter = bit_subset.iter().copied();
 
            for index in BitChunkIter::new(chunk_iter) {
 
                println!("  index {:?}", index);
 
            }
 
        }
 
        Ok(0)
 
    }
 
}
 

	
 
macro_rules! bitslice {
 
    ($( $num:expr  ),*) => {{
 
        &[0 $( | (1usize << $num)  )*]
 
    }};
 
}
 

	
 
#[test]
 
fn api_new_test() {
 
    let mut c = Connecting::default();
 
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap());
 
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
 
    let proto_0 = Arc::new(Protocol::parse(b"").unwrap());
 
    let mut c = c.connect(None).unwrap();
 
    let (mem_out, mem_in) = c.new_channel();
 
    let mut inbuf = [0u8; 64];
 
    c.new_component(&proto_0, b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap();
 
    let mut ops = [
 
        PortOpRs::In { msg_range: None, port: &mem_in },
 
        PortOpRs::Out { msg: b"hey", port: &net_out, optional: false },
 
        PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true },
 
        PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false },
 
    ];
 
    c.sync_set(&mut inbuf, &mut ops).unwrap();
 
    c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap();
 
}
 

	
 
#[repr(C)]
 
pub struct PortOp {
 
    msgptr: *mut u8, // read if OUT, field written if IN, will point into buf
 
    msglen: usize,   // read if OUT, written if IN, won't exceed buf
 
    port: Port,
 
    optional: bool, // no meaning if
 
}
 

	
 
pub enum PortOpRs<'a> {
 
    In { msg_range: Option<Range<usize>>, port: &'a InPort },
 
    Out { msg: &'a [u8], port: &'a OutPort, optional: bool },
 
}
 

	
 
fn c_sync_set(
 
unsafe fn c_sync_set(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbuf: *mut u8,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    ops: *mut PortOp,
 
    opsptr: *mut PortOp,
 
) -> i32 {
 
    let buf = as_mut_slice(inbuflen, inbufptr);
 
    let ops = as_mut_slice(opslen, opsptr);
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    assert_eq!(0, subset_index);
 
    for op in ops {
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    0
 
}
 

	
 
use super::bits::{usizes_for_bits, BitChunkIter};
 
unsafe fn c_sync_subset(
 
    connected: &mut Connected,
 
    inbuflen: usize,
 
    inbufptr: *mut u8,
 
    opslen: usize,
 
    opsptr: *mut PortOp,
 
    subsetslen: usize,
 
    subsetsptr: *const *const usize,
 
) -> i32 {
 
    let buf: &mut [u8] = as_mut_slice(inbuflen, inbufptr);
 
    let ops: &mut [PortOp] = as_mut_slice(opslen, opsptr);
 
    let subsets: &[*const usize] = as_const_slice(subsetslen, subsetsptr);
 
    let subsetlen = usizes_for_bits(opslen);
 
    // don't yet know subsetptr; which subset fires unknown!
 

	
 
    let (subset_index, wrote) = sync_inner(connected, buf);
 
    let subsetptr: *const usize = subsets[subset_index];
 
    let subset: &[usize] = as_const_slice(subsetlen, subsetptr);
 

	
 
    for index in BitChunkIter::new(subset.iter().copied()) {
 
        let op = &mut ops[index as usize];
 
        if let Some(range) = wrote.get(&op.port) {
 
            op.msgptr = inbufptr.add(range.start);
 
            op.msglen = range.end - range.start;
 
        }
 
    }
 
    subset_index as i32
 
}
 

	
 
// dummy fn for the actual synchronous round
 
fn sync_inner<'c, 'b>(
 
    _connected: &'c mut Connected,
 
    _buf: &'b mut [u8],
 
) -> (usize, &'b HashMap<Port, Range<usize>>) {
 
    todo!()
 
}
 

	
 
unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] {
 
    std::slice::from_raw_parts_mut(ptr, len)
 
}
 
unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] {
 
    std::slice::from_raw_parts(ptr, len)
 
}
src/runtime/experimental/bits.rs
Show inline comments
 
use crate::common::*;
 

	
 
/// Given an iterator over BitChunk Items, iterates over the indices (each represented as a u32) for which the bit is SET,
 
/// treating the bits in the BitChunk as a contiguous array.
 
/// e.g. input [0b111000, 0b11] gives output [3, 4, 5, 32, 33].
 
/// observe that the bits per chunk are ordered from least to most significant bits, yielding smaller to larger usizes.
 
/// assumes chunk_iter will yield no more than std::u32::MAX / 32 chunks
 

	
 
const fn usize_bytes() -> usize {
 
pub const fn usize_bytes() -> usize {
 
    std::mem::size_of::<usize>()
 
}
 
const fn usize_bits() -> usize {
 
pub const fn usize_bits() -> usize {
 
    usize_bytes() * 8
 
}
 
pub const fn usizes_for_bits(bits: usize) -> usize {
 
    (bits + (usize_bits() - 1)) / usize_bits()
 
}
 

	
 
pub(crate) struct BitChunkIter<I: Iterator<Item = usize>> {
 
    cached: usize,
 
    chunk_iter: I,
 
    next_bit_index: u32,
 
}
 
impl<I: Iterator<Item = usize>> BitChunkIter<I> {
 
    pub fn new(chunk_iter: I) -> Self {
 
        // first chunk is always a dummy zero, as if chunk_iter yielded Some(FALSE).
 
        // Consequences:
 
        // 1. our next_bit_index is always off by usize_bits() (we correct for it in Self::next) (no additional overhead)
 
        // 2. we cache usize and not Option<usize>, because chunk_iter.next() is only called in Self::next.
 
        Self { chunk_iter, next_bit_index: 0, cached: 0 }
 
    }
 
}
 
impl<I: Iterator<Item = usize>> Iterator for BitChunkIter<I> {
 
    type Item = u32;
 
    fn next(&mut self) -> Option<Self::Item> {
 
        let mut chunk = self.cached;
 

	
 
        // loop until either:
 
        // 1. there are no more Items to return, or
 
        // 2. chunk encodes 1+ Items, one of which we will return.
 
        while chunk == 0 {
 
            // chunk has no bits set! get the next one...
 
            chunk = self.chunk_iter.next()?;
 

	
 
            // ... and jump self.next_bit_index to the next multiple of usize_bits().
 
            self.next_bit_index =
 
                (self.next_bit_index + usize_bits() as u32) & !(usize_bits() as u32 - 1);
 
        }
 
        // there exists 1+ set bits in chunk
 
        // assert(chunk > 0);
 

	
 
        // Until the least significant bit of chunk is 1:
 
        // 1. shift chunk to the right,
 
        // 2. and increment self.next_bit_index accordingly
 
        // effectively performs a little binary search, shifting 32, then 16, ...
 
        // TODO perhaps there is a more efficient SIMD op for this?
 
        const N_INIT: u32 = usize_bits() as u32 / 2;
 
        let mut n = N_INIT;
 
        while n >= 1 {
 
            // n is [32,16,8,4,2,1] on 64-bit machine
 
            // this loop is unrolled with release optimizations
 
            let n_least_significant_mask = (1 << n) - 1;
 
            if chunk & n_least_significant_mask == 0 {
 
                // no 1 set within 0..n least significant bits.
 
                self.next_bit_index += n;
 
@@ -100,111 +103,107 @@ struct Pair {
 
impl From<[u32; 2]> for Pair {
 
    fn from([entity, property]: [u32; 2]) -> Self {
 
        Pair { entity, property }
 
    }
 
}
 
struct BitMatrix {
 
    bounds: Pair,
 
    buffer: *mut usize,
 
}
 
impl Drop for BitMatrix {
 
    fn drop(&mut self) {
 
        let total_chunks = Self::row_chunks(self.bounds.property as usize)
 
            * Self::column_chunks(self.bounds.entity as usize);
 
        let layout = Self::layout_for(total_chunks);
 
        unsafe {
 
            // ?
 
            std::alloc::dealloc(self.buffer as *mut u8, layout);
 
        }
 
    }
 
}
 
impl Debug for BitMatrix {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let row_chunks = Self::row_chunks(self.bounds.property as usize);
 
        let column_chunks = Self::column_chunks(self.bounds.entity as usize);
 
        for property in 0..row_chunks {
 
            for entity_chunk in 0..column_chunks {
 
                write!(f, "|")?;
 
                let mut chunk = unsafe { *self.buffer.add(row_chunks * entity_chunk + property) };
 
                let end = if entity_chunk + 1 == column_chunks {
 
                    self.bounds.entity % usize_bits() as u32
 
                } else {
 
                    usize_bits() as u32
 
                };
 
                for _ in 0..end {
 
                    let c = match chunk & 1 {
 
                        0 => '0',
 
                        _ => '1',
 
                    };
 
                    write!(f, "{}", c)?;
 
                    chunk >>= 1;
 
                }
 
            }
 
            write!(f, "|\n")?;
 
        }
 
        Ok(())
 
    }
 
}
 
impl BitMatrix {
 
    #[inline]
 
    const fn chunk_len_ceil(value: usize) -> usize {
 
        (value + usize_bits() - 1) & !(usize_bits() - 1)
 
    }
 
    #[inline]
 
    const fn row_of(entity: usize) -> usize {
 
        entity / usize_bits()
 
    }
 
    #[inline]
 
    const fn row_chunks(property_bound: usize) -> usize {
 
        property_bound
 
    }
 
    #[inline]
 
    const fn column_chunks(entity_bound: usize) -> usize {
 
        Self::chunk_len_ceil(entity_bound) / usize_bits()
 
        usizes_for_bits(entity_bound + 1)
 
    }
 
    #[inline]
 
    fn offsets_unchecked(&self, at: Pair) -> [usize; 2] {
 
        let o_in = at.entity as usize % usize_bits();
 
        let row = Self::row_of(at.entity as usize);
 
        let row_chunks = self.bounds.property as usize;
 
        let o_of = row * row_chunks + at.property as usize;
 
        [o_of, o_in]
 
    }
 
    // returns a u32 which has bits 000...000111...111
 
    // for the last JAGGED chunk given the column size
 
    // if the last chunk is not jagged (when entity_bound % 32 == 0)
 
    // None is returned,
 
    // otherwise Some(x) is returned such that x & chunk would mask out
 
    // the bits NOT in 0..entity_bound
 
    fn last_row_chunk_mask(entity_bound: u32) -> Option<usize> {
 
        let zero_prefix_len = entity_bound as usize % usize_bits();
 
        if zero_prefix_len == 0 {
 
            None
 
        } else {
 
            Some(!0 >> (usize_bits() - zero_prefix_len))
 
        }
 
    }
 
    fn assert_within_bounds(&self, at: Pair) {
 
        assert!(at.entity < self.bounds.entity);
 
        assert!(at.property < self.bounds.property);
 
    }
 

	
 
    fn layout_for(mut total_chunks: usize) -> std::alloc::Layout {
 
        unsafe {
 
            // this layout is ALWAYS valid:
 
            // 1. size is always nonzero
 
            // 2. size is always a multiple of 4 and 4-aligned
 
            if total_chunks == 0 {
 
                total_chunks = 1;
 
            }
 
            std::alloc::Layout::from_size_align_unchecked(
 
                usize_bytes() * total_chunks,
 
                usize_bytes(),
 
            )
 
        }
 
    }
 
    /////////
 

	
 
    fn reshape(&mut self, bounds: Pair) {
 
        todo!()
 
    }
 

	
0 comments (0 inline, 0 general)